channel_store.rs

   1mod channel_index;
   2
   3use crate::{ChannelMessage, channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
   4use anyhow::{Context as _, Result, anyhow};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
   7use collections::{HashMap, HashSet, hash_map};
   8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
   9use gpui::{
  10    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
  11    WeakEntity,
  12};
  13use language::Capability;
  14use rpc::{
  15    TypedEnvelope,
  16    proto::{self, ChannelRole, ChannelVisibility},
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{ResultExt, maybe};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
  25    let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  26    cx.set_global(GlobalChannelStore(channel_store));
  27}
  28
  29#[derive(Debug, Clone, Default, PartialEq)]
  30struct NotesVersion {
  31    epoch: u64,
  32    version: clock::Global,
  33}
  34
  35pub struct ChannelStore {
  36    pub channel_index: ChannelIndex,
  37    channel_invitations: Vec<Arc<Channel>>,
  38    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  39    channel_states: HashMap<ChannelId, ChannelState>,
  40    outgoing_invites: HashSet<(ChannelId, UserId)>,
  41    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  42    opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
  43    opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
  44    client: Arc<Client>,
  45    did_subscribe: bool,
  46    user_store: Entity<UserStore>,
  47    _rpc_subscriptions: [Subscription; 2],
  48    _watch_connection_status: Task<Option<()>>,
  49    disconnect_channel_buffers_task: Option<Task<()>>,
  50    _update_channels: Task<()>,
  51}
  52
  53#[derive(Clone, Debug)]
  54pub struct Channel {
  55    pub id: ChannelId,
  56    pub name: SharedString,
  57    pub visibility: proto::ChannelVisibility,
  58    pub parent_path: Vec<ChannelId>,
  59    pub channel_order: i32,
  60}
  61
  62#[derive(Default, Debug)]
  63pub struct ChannelState {
  64    latest_chat_message: Option<u64>,
  65    latest_notes_version: NotesVersion,
  66    observed_notes_version: NotesVersion,
  67    observed_chat_message: Option<u64>,
  68    role: Option<ChannelRole>,
  69}
  70
  71impl Channel {
  72    pub fn link(&self, cx: &App) -> String {
  73        format!(
  74            "{}/channel/{}-{}",
  75            ClientSettings::get_global(cx).server_url,
  76            Self::slug(&self.name),
  77            self.id
  78        )
  79    }
  80
  81    pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
  82        self.link(cx)
  83            + "/notes"
  84            + &heading
  85                .map(|h| format!("#{}", Self::slug(&h)))
  86                .unwrap_or_default()
  87    }
  88
  89    pub fn is_root_channel(&self) -> bool {
  90        self.parent_path.is_empty()
  91    }
  92
  93    pub fn root_id(&self) -> ChannelId {
  94        self.parent_path.first().copied().unwrap_or(self.id)
  95    }
  96
  97    pub fn slug(str: &str) -> String {
  98        let slug: String = str
  99            .chars()
 100            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 101            .collect();
 102
 103        slug.trim_matches(|c| c == '-').to_string()
 104    }
 105}
 106
 107#[derive(Debug)]
 108pub struct ChannelMembership {
 109    pub user: Arc<User>,
 110    pub kind: proto::channel_member::Kind,
 111    pub role: proto::ChannelRole,
 112}
 113impl ChannelMembership {
 114    pub fn sort_key(&self) -> MembershipSortKey<'_> {
 115        MembershipSortKey {
 116            role_order: match self.role {
 117                proto::ChannelRole::Admin => 0,
 118                proto::ChannelRole::Member => 1,
 119                proto::ChannelRole::Banned => 2,
 120                proto::ChannelRole::Talker => 3,
 121                proto::ChannelRole::Guest => 4,
 122            },
 123            kind_order: match self.kind {
 124                proto::channel_member::Kind::Member => 0,
 125                proto::channel_member::Kind::Invitee => 1,
 126            },
 127            username_order: self.user.github_login.as_str(),
 128        }
 129    }
 130}
 131
 132#[derive(PartialOrd, Ord, PartialEq, Eq)]
 133pub struct MembershipSortKey<'a> {
 134    role_order: u8,
 135    kind_order: u8,
 136    username_order: &'a str,
 137}
 138
 139pub enum ChannelEvent {
 140    ChannelCreated(ChannelId),
 141    ChannelRenamed(ChannelId),
 142}
 143
 144impl EventEmitter<ChannelEvent> for ChannelStore {}
 145
 146enum OpenEntityHandle<E> {
 147    Open(WeakEntity<E>),
 148    Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
 149}
 150
 151struct GlobalChannelStore(Entity<ChannelStore>);
 152
 153impl Global for GlobalChannelStore {}
 154
 155impl ChannelStore {
 156    pub fn global(cx: &App) -> Entity<Self> {
 157        cx.global::<GlobalChannelStore>().0.clone()
 158    }
 159
 160    pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 161        let rpc_subscriptions = [
 162            client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
 163            client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
 164        ];
 165
 166        let mut connection_status = client.status();
 167        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 168        let watch_connection_status = cx.spawn(async move |this, cx| {
 169            while let Some(status) = connection_status.next().await {
 170                let this = this.upgrade()?;
 171                match status {
 172                    client::Status::Connected { .. } => {
 173                        this.update(cx, |this, cx| this.handle_connect(cx))
 174                            .ok()?
 175                            .await
 176                            .log_err()?;
 177                    }
 178                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 179                        this.update(cx, |this, cx| this.handle_disconnect(false, cx))
 180                            .ok();
 181                    }
 182                    _ => {
 183                        this.update(cx, |this, cx| this.handle_disconnect(true, cx))
 184                            .ok();
 185                    }
 186                }
 187            }
 188            Some(())
 189        });
 190
 191        Self {
 192            channel_invitations: Vec::default(),
 193            channel_index: ChannelIndex::default(),
 194            channel_participants: Default::default(),
 195            outgoing_invites: Default::default(),
 196            opened_buffers: Default::default(),
 197            opened_chats: Default::default(),
 198            update_channels_tx,
 199            client,
 200            user_store,
 201            _rpc_subscriptions: rpc_subscriptions,
 202            _watch_connection_status: watch_connection_status,
 203            disconnect_channel_buffers_task: None,
 204            _update_channels: cx.spawn(async move |this, cx| {
 205                maybe!(async move {
 206                    while let Some(update_channels) = update_channels_rx.next().await {
 207                        if let Some(this) = this.upgrade() {
 208                            let update_task = this
 209                                .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
 210                            if let Some(update_task) = update_task {
 211                                update_task.await.log_err();
 212                            }
 213                        }
 214                    }
 215                    anyhow::Ok(())
 216                })
 217                .await
 218                .log_err();
 219            }),
 220            channel_states: Default::default(),
 221            did_subscribe: false,
 222        }
 223    }
 224
 225    pub fn initialize(&mut self) {
 226        if !self.did_subscribe
 227            && self
 228                .client
 229                .send(proto::SubscribeToChannels {})
 230                .log_err()
 231                .is_some()
 232        {
 233            self.did_subscribe = true;
 234        }
 235    }
 236
 237    pub fn client(&self) -> Arc<Client> {
 238        self.client.clone()
 239    }
 240
 241    /// Returns the number of unique channels in the store
 242    pub fn channel_count(&self) -> usize {
 243        self.channel_index.by_id().len()
 244    }
 245
 246    /// Returns the index of a channel ID in the list of unique channels
 247    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 248        self.channel_index
 249            .by_id()
 250            .keys()
 251            .position(|id| *id == channel_id)
 252    }
 253
 254    /// Returns an iterator over all unique channels
 255    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 256        self.channel_index.by_id().values()
 257    }
 258
 259    /// Iterate over all entries in the channel DAG
 260    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 261        self.channel_index
 262            .ordered_channels()
 263            .iter()
 264            .filter_map(move |id| {
 265                let channel = self.channel_index.by_id().get(id)?;
 266                Some((channel.parent_path.len(), channel))
 267            })
 268    }
 269
 270    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 271        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 272        self.channel_index.by_id().get(channel_id)
 273    }
 274
 275    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 276        self.channel_index.by_id().values().nth(ix)
 277    }
 278
 279    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 280        self.channel_invitations
 281            .iter()
 282            .any(|channel| channel.id == channel_id)
 283    }
 284
 285    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 286        &self.channel_invitations
 287    }
 288
 289    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 290        self.channel_index.by_id().get(&channel_id)
 291    }
 292
 293    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
 294        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 295            if let OpenEntityHandle::Open(buffer) = buffer {
 296                return buffer.upgrade().is_some();
 297            }
 298        }
 299        false
 300    }
 301
 302    pub fn open_channel_buffer(
 303        &mut self,
 304        channel_id: ChannelId,
 305        cx: &mut Context<Self>,
 306    ) -> Task<Result<Entity<ChannelBuffer>>> {
 307        let client = self.client.clone();
 308        let user_store = self.user_store.clone();
 309        let channel_store = cx.entity();
 310        self.open_channel_resource(
 311            channel_id,
 312            |this| &mut this.opened_buffers,
 313            async move |channel, cx| {
 314                ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
 315            },
 316            cx,
 317        )
 318    }
 319
 320    pub fn fetch_channel_messages(
 321        &self,
 322        message_ids: Vec<u64>,
 323        cx: &mut Context<Self>,
 324    ) -> Task<Result<Vec<ChannelMessage>>> {
 325        let request = if message_ids.is_empty() {
 326            None
 327        } else {
 328            Some(
 329                self.client
 330                    .request(proto::GetChannelMessagesById { message_ids }),
 331            )
 332        };
 333        cx.spawn(async move |this, cx| {
 334            if let Some(request) = request {
 335                let response = request.await?;
 336                let this = this.upgrade().context("channel store dropped")?;
 337                let user_store = this.read_with(cx, |this, _| this.user_store.clone())?;
 338                ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
 339            } else {
 340                Ok(Vec::new())
 341            }
 342        })
 343    }
 344
 345    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 346        self.channel_states
 347            .get(&channel_id)
 348            .is_some_and(|state| state.has_channel_buffer_changed())
 349    }
 350
 351    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 352        self.channel_states
 353            .get(&channel_id)
 354            .is_some_and(|state| state.has_new_messages())
 355    }
 356
 357    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 358        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 359            state.latest_chat_message = message_id;
 360        }
 361    }
 362
 363    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 364        self.channel_states.get(&channel_id).and_then(|state| {
 365            if let Some(last_message_id) = state.latest_chat_message {
 366                if state
 367                    .last_acknowledged_message_id()
 368                    .is_some_and(|id| id < last_message_id)
 369                {
 370                    return state.last_acknowledged_message_id();
 371                }
 372            }
 373
 374            None
 375        })
 376    }
 377
 378    pub fn acknowledge_message_id(
 379        &mut self,
 380        channel_id: ChannelId,
 381        message_id: u64,
 382        cx: &mut Context<Self>,
 383    ) {
 384        self.channel_states
 385            .entry(channel_id)
 386            .or_default()
 387            .acknowledge_message_id(message_id);
 388        cx.notify();
 389    }
 390
 391    pub fn update_latest_message_id(
 392        &mut self,
 393        channel_id: ChannelId,
 394        message_id: u64,
 395        cx: &mut Context<Self>,
 396    ) {
 397        self.channel_states
 398            .entry(channel_id)
 399            .or_default()
 400            .update_latest_message_id(message_id);
 401        cx.notify();
 402    }
 403
 404    pub fn acknowledge_notes_version(
 405        &mut self,
 406        channel_id: ChannelId,
 407        epoch: u64,
 408        version: &clock::Global,
 409        cx: &mut Context<Self>,
 410    ) {
 411        self.channel_states
 412            .entry(channel_id)
 413            .or_default()
 414            .acknowledge_notes_version(epoch, version);
 415        cx.notify()
 416    }
 417
 418    pub fn update_latest_notes_version(
 419        &mut self,
 420        channel_id: ChannelId,
 421        epoch: u64,
 422        version: &clock::Global,
 423        cx: &mut Context<Self>,
 424    ) {
 425        self.channel_states
 426            .entry(channel_id)
 427            .or_default()
 428            .update_latest_notes_version(epoch, version);
 429        cx.notify()
 430    }
 431
 432    pub fn open_channel_chat(
 433        &mut self,
 434        channel_id: ChannelId,
 435        cx: &mut Context<Self>,
 436    ) -> Task<Result<Entity<ChannelChat>>> {
 437        let client = self.client.clone();
 438        let user_store = self.user_store.clone();
 439        let this = cx.entity();
 440        self.open_channel_resource(
 441            channel_id,
 442            |this| &mut this.opened_chats,
 443            async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
 444            cx,
 445        )
 446    }
 447
 448    /// Asynchronously open a given resource associated with a channel.
 449    ///
 450    /// Make sure that the resource is only opened once, even if this method
 451    /// is called multiple times with the same channel id while the first task
 452    /// is still running.
 453    fn open_channel_resource<T, F>(
 454        &mut self,
 455        channel_id: ChannelId,
 456        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
 457        load: F,
 458        cx: &mut Context<Self>,
 459    ) -> Task<Result<Entity<T>>>
 460    where
 461        F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
 462        T: 'static,
 463    {
 464        let task = loop {
 465            match get_map(self).entry(channel_id) {
 466                hash_map::Entry::Occupied(e) => match e.get() {
 467                    OpenEntityHandle::Open(entity) => {
 468                        if let Some(entity) = entity.upgrade() {
 469                            break Task::ready(Ok(entity)).shared();
 470                        } else {
 471                            get_map(self).remove(&channel_id);
 472                            continue;
 473                        }
 474                    }
 475                    OpenEntityHandle::Loading(task) => {
 476                        break task.clone();
 477                    }
 478                },
 479                hash_map::Entry::Vacant(e) => {
 480                    let task = cx
 481                        .spawn(async move |this, cx| {
 482                            let channel = this.read_with(cx, |this, _| {
 483                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 484                                    Arc::new(anyhow!("no channel for id: {channel_id}"))
 485                                })
 486                            })??;
 487
 488                            load(channel, cx).await.map_err(Arc::new)
 489                        })
 490                        .shared();
 491
 492                    e.insert(OpenEntityHandle::Loading(task.clone()));
 493                    cx.spawn({
 494                        let task = task.clone();
 495                        async move |this, cx| {
 496                            let result = task.await;
 497                            this.update(cx, |this, _| match result {
 498                                Ok(model) => {
 499                                    get_map(this).insert(
 500                                        channel_id,
 501                                        OpenEntityHandle::Open(model.downgrade()),
 502                                    );
 503                                }
 504                                Err(_) => {
 505                                    get_map(this).remove(&channel_id);
 506                                }
 507                            })
 508                            .ok();
 509                        }
 510                    })
 511                    .detach();
 512                    break task;
 513                }
 514            }
 515        };
 516        cx.background_spawn(async move { task.await.map_err(|error| anyhow!("{error}")) })
 517    }
 518
 519    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 520        self.channel_role(channel_id) == proto::ChannelRole::Admin
 521    }
 522
 523    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 524        self.channel_index
 525            .by_id()
 526            .get(&channel_id)
 527            .map_or(false, |channel| channel.is_root_channel())
 528    }
 529
 530    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 531        self.channel_index
 532            .by_id()
 533            .get(&channel_id)
 534            .map_or(false, |channel| {
 535                channel.visibility == ChannelVisibility::Public
 536            })
 537    }
 538
 539    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 540        match self.channel_role(channel_id) {
 541            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 542            _ => Capability::ReadOnly,
 543        }
 544    }
 545
 546    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 547        maybe!({
 548            let mut channel = self.channel_for_id(channel_id)?;
 549            if !channel.is_root_channel() {
 550                channel = self.channel_for_id(channel.root_id())?;
 551            }
 552            let root_channel_state = self.channel_states.get(&channel.id);
 553            root_channel_state?.role
 554        })
 555        .unwrap_or(proto::ChannelRole::Guest)
 556    }
 557
 558    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 559        self.channel_participants
 560            .get(&channel_id)
 561            .map_or(&[], |v| v.as_slice())
 562    }
 563
 564    pub fn create_channel(
 565        &self,
 566        name: &str,
 567        parent_id: Option<ChannelId>,
 568        cx: &mut Context<Self>,
 569    ) -> Task<Result<ChannelId>> {
 570        let client = self.client.clone();
 571        let name = name.trim_start_matches('#').to_owned();
 572        cx.spawn(async move |this, cx| {
 573            let response = client
 574                .request(proto::CreateChannel {
 575                    name,
 576                    parent_id: parent_id.map(|cid| cid.0),
 577                })
 578                .await?;
 579
 580            let channel = response.channel.context("missing channel in response")?;
 581            let channel_id = ChannelId(channel.id);
 582
 583            this.update(cx, |this, cx| {
 584                let task = this.update_channels(
 585                    proto::UpdateChannels {
 586                        channels: vec![channel],
 587                        ..Default::default()
 588                    },
 589                    cx,
 590                );
 591                assert!(task.is_none());
 592
 593                // This event is emitted because the collab panel wants to clear the pending edit state
 594                // before this frame is rendered. But we can't guarantee that the collab panel's future
 595                // will resolve before this flush_effects finishes. Synchronously emitting this event
 596                // ensures that the collab panel will observe this creation before the frame completes
 597                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 598            })?;
 599
 600            Ok(channel_id)
 601        })
 602    }
 603
 604    pub fn move_channel(
 605        &mut self,
 606        channel_id: ChannelId,
 607        to: ChannelId,
 608        cx: &mut Context<Self>,
 609    ) -> Task<Result<()>> {
 610        let client = self.client.clone();
 611        cx.spawn(async move |_, _| {
 612            let _ = client
 613                .request(proto::MoveChannel {
 614                    channel_id: channel_id.0,
 615                    to: to.0,
 616                })
 617                .await?;
 618            Ok(())
 619        })
 620    }
 621
 622    pub fn reorder_channel(
 623        &mut self,
 624        channel_id: ChannelId,
 625        direction: proto::reorder_channel::Direction,
 626        cx: &mut Context<Self>,
 627    ) -> Task<Result<()>> {
 628        let client = self.client.clone();
 629        cx.spawn(async move |_, _| {
 630            client
 631                .request(proto::ReorderChannel {
 632                    channel_id: channel_id.0,
 633                    direction: direction.into(),
 634                })
 635                .await?;
 636            Ok(())
 637        })
 638    }
 639
 640    pub fn set_channel_visibility(
 641        &mut self,
 642        channel_id: ChannelId,
 643        visibility: ChannelVisibility,
 644        cx: &mut Context<Self>,
 645    ) -> Task<Result<()>> {
 646        let client = self.client.clone();
 647        cx.spawn(async move |_, _| {
 648            let _ = client
 649                .request(proto::SetChannelVisibility {
 650                    channel_id: channel_id.0,
 651                    visibility: visibility.into(),
 652                })
 653                .await?;
 654
 655            Ok(())
 656        })
 657    }
 658
 659    pub fn invite_member(
 660        &mut self,
 661        channel_id: ChannelId,
 662        user_id: UserId,
 663        role: proto::ChannelRole,
 664        cx: &mut Context<Self>,
 665    ) -> Task<Result<()>> {
 666        if !self.outgoing_invites.insert((channel_id, user_id)) {
 667            return Task::ready(Err(anyhow!("invite request already in progress")));
 668        }
 669
 670        cx.notify();
 671        let client = self.client.clone();
 672        cx.spawn(async move |this, cx| {
 673            let result = client
 674                .request(proto::InviteChannelMember {
 675                    channel_id: channel_id.0,
 676                    user_id,
 677                    role: role.into(),
 678                })
 679                .await;
 680
 681            this.update(cx, |this, cx| {
 682                this.outgoing_invites.remove(&(channel_id, user_id));
 683                cx.notify();
 684            })?;
 685
 686            result?;
 687
 688            Ok(())
 689        })
 690    }
 691
 692    pub fn remove_member(
 693        &mut self,
 694        channel_id: ChannelId,
 695        user_id: u64,
 696        cx: &mut Context<Self>,
 697    ) -> Task<Result<()>> {
 698        if !self.outgoing_invites.insert((channel_id, user_id)) {
 699            return Task::ready(Err(anyhow!("invite request already in progress")));
 700        }
 701
 702        cx.notify();
 703        let client = self.client.clone();
 704        cx.spawn(async move |this, cx| {
 705            let result = client
 706                .request(proto::RemoveChannelMember {
 707                    channel_id: channel_id.0,
 708                    user_id,
 709                })
 710                .await;
 711
 712            this.update(cx, |this, cx| {
 713                this.outgoing_invites.remove(&(channel_id, user_id));
 714                cx.notify();
 715            })?;
 716            result?;
 717            Ok(())
 718        })
 719    }
 720
 721    pub fn set_member_role(
 722        &mut self,
 723        channel_id: ChannelId,
 724        user_id: UserId,
 725        role: proto::ChannelRole,
 726        cx: &mut Context<Self>,
 727    ) -> Task<Result<()>> {
 728        if !self.outgoing_invites.insert((channel_id, user_id)) {
 729            return Task::ready(Err(anyhow!("member request already in progress")));
 730        }
 731
 732        cx.notify();
 733        let client = self.client.clone();
 734        cx.spawn(async move |this, cx| {
 735            let result = client
 736                .request(proto::SetChannelMemberRole {
 737                    channel_id: channel_id.0,
 738                    user_id,
 739                    role: role.into(),
 740                })
 741                .await;
 742
 743            this.update(cx, |this, cx| {
 744                this.outgoing_invites.remove(&(channel_id, user_id));
 745                cx.notify();
 746            })?;
 747
 748            result?;
 749            Ok(())
 750        })
 751    }
 752
 753    pub fn rename(
 754        &mut self,
 755        channel_id: ChannelId,
 756        new_name: &str,
 757        cx: &mut Context<Self>,
 758    ) -> Task<Result<()>> {
 759        let client = self.client.clone();
 760        let name = new_name.to_string();
 761        cx.spawn(async move |this, cx| {
 762            let channel = client
 763                .request(proto::RenameChannel {
 764                    channel_id: channel_id.0,
 765                    name,
 766                })
 767                .await?
 768                .channel
 769                .context("missing channel in response")?;
 770            this.update(cx, |this, cx| {
 771                let task = this.update_channels(
 772                    proto::UpdateChannels {
 773                        channels: vec![channel],
 774                        ..Default::default()
 775                    },
 776                    cx,
 777                );
 778                assert!(task.is_none());
 779
 780                // This event is emitted because the collab panel wants to clear the pending edit state
 781                // before this frame is rendered. But we can't guarantee that the collab panel's future
 782                // will resolve before this flush_effects finishes. Synchronously emitting this event
 783                // ensures that the collab panel will observe this creation before the frame complete
 784                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 785            })?;
 786            Ok(())
 787        })
 788    }
 789
 790    pub fn respond_to_channel_invite(
 791        &mut self,
 792        channel_id: ChannelId,
 793        accept: bool,
 794        cx: &mut Context<Self>,
 795    ) -> Task<Result<()>> {
 796        let client = self.client.clone();
 797        cx.background_spawn(async move {
 798            client
 799                .request(proto::RespondToChannelInvite {
 800                    channel_id: channel_id.0,
 801                    accept,
 802                })
 803                .await?;
 804            Ok(())
 805        })
 806    }
 807    pub fn fuzzy_search_members(
 808        &self,
 809        channel_id: ChannelId,
 810        query: String,
 811        limit: u16,
 812        cx: &mut Context<Self>,
 813    ) -> Task<Result<Vec<ChannelMembership>>> {
 814        let client = self.client.clone();
 815        let user_store = self.user_store.downgrade();
 816        cx.spawn(async move |_, cx| {
 817            let response = client
 818                .request(proto::GetChannelMembers {
 819                    channel_id: channel_id.0,
 820                    query,
 821                    limit: limit as u64,
 822                })
 823                .await?;
 824            user_store.update(cx, |user_store, _| {
 825                user_store.insert(response.users);
 826                response
 827                    .members
 828                    .into_iter()
 829                    .filter_map(|member| {
 830                        Some(ChannelMembership {
 831                            user: user_store.get_cached_user(member.user_id)?,
 832                            role: member.role(),
 833                            kind: member.kind(),
 834                        })
 835                    })
 836                    .collect()
 837            })
 838        })
 839    }
 840
 841    pub fn remove_channel(
 842        &self,
 843        channel_id: ChannelId,
 844    ) -> impl Future<Output = Result<()>> + use<> {
 845        let client = self.client.clone();
 846        async move {
 847            client
 848                .request(proto::DeleteChannel {
 849                    channel_id: channel_id.0,
 850                })
 851                .await?;
 852            Ok(())
 853        }
 854    }
 855
 856    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 857        false
 858    }
 859
 860    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 861        self.outgoing_invites.contains(&(channel_id, user_id))
 862    }
 863
 864    async fn handle_update_channels(
 865        this: Entity<Self>,
 866        message: TypedEnvelope<proto::UpdateChannels>,
 867        mut cx: AsyncApp,
 868    ) -> Result<()> {
 869        this.read_with(&mut cx, |this, _| {
 870            this.update_channels_tx
 871                .unbounded_send(message.payload)
 872                .unwrap();
 873        })?;
 874        Ok(())
 875    }
 876
 877    async fn handle_update_user_channels(
 878        this: Entity<Self>,
 879        message: TypedEnvelope<proto::UpdateUserChannels>,
 880        mut cx: AsyncApp,
 881    ) -> Result<()> {
 882        this.update(&mut cx, |this, cx| {
 883            for buffer_version in message.payload.observed_channel_buffer_version {
 884                let version = language::proto::deserialize_version(&buffer_version.version);
 885                this.acknowledge_notes_version(
 886                    ChannelId(buffer_version.channel_id),
 887                    buffer_version.epoch,
 888                    &version,
 889                    cx,
 890                );
 891            }
 892            for message_id in message.payload.observed_channel_message_id {
 893                this.acknowledge_message_id(
 894                    ChannelId(message_id.channel_id),
 895                    message_id.message_id,
 896                    cx,
 897                );
 898            }
 899            for membership in message.payload.channel_memberships {
 900                if let Some(role) = ChannelRole::from_i32(membership.role) {
 901                    this.channel_states
 902                        .entry(ChannelId(membership.channel_id))
 903                        .or_default()
 904                        .set_role(role)
 905                }
 906            }
 907        })
 908    }
 909
 910    fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 911        self.channel_index.clear();
 912        self.channel_invitations.clear();
 913        self.channel_participants.clear();
 914        self.channel_index.clear();
 915        self.outgoing_invites.clear();
 916        self.disconnect_channel_buffers_task.take();
 917
 918        for chat in self.opened_chats.values() {
 919            if let OpenEntityHandle::Open(chat) = chat {
 920                if let Some(chat) = chat.upgrade() {
 921                    chat.update(cx, |chat, cx| {
 922                        chat.rejoin(cx);
 923                    });
 924                }
 925            }
 926        }
 927
 928        let mut buffer_versions = Vec::new();
 929        for buffer in self.opened_buffers.values() {
 930            if let OpenEntityHandle::Open(buffer) = buffer {
 931                if let Some(buffer) = buffer.upgrade() {
 932                    let channel_buffer = buffer.read(cx);
 933                    let buffer = channel_buffer.buffer().read(cx);
 934                    buffer_versions.push(proto::ChannelBufferVersion {
 935                        channel_id: channel_buffer.channel_id.0,
 936                        epoch: channel_buffer.epoch(),
 937                        version: language::proto::serialize_version(&buffer.version()),
 938                    });
 939                }
 940            }
 941        }
 942
 943        if buffer_versions.is_empty() {
 944            return Task::ready(Ok(()));
 945        }
 946
 947        let response = self.client.request(proto::RejoinChannelBuffers {
 948            buffers: buffer_versions,
 949        });
 950
 951        cx.spawn(async move |this, cx| {
 952            let mut response = response.await?;
 953
 954            this.update(cx, |this, cx| {
 955                this.opened_buffers.retain(|_, buffer| match buffer {
 956                    OpenEntityHandle::Open(channel_buffer) => {
 957                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 958                            return false;
 959                        };
 960
 961                        channel_buffer.update(cx, |channel_buffer, cx| {
 962                            let channel_id = channel_buffer.channel_id;
 963                            if let Some(remote_buffer) = response
 964                                .buffers
 965                                .iter_mut()
 966                                .find(|buffer| buffer.channel_id == channel_id.0)
 967                            {
 968                                let channel_id = channel_buffer.channel_id;
 969                                let remote_version =
 970                                    language::proto::deserialize_version(&remote_buffer.version);
 971
 972                                channel_buffer.replace_collaborators(
 973                                    mem::take(&mut remote_buffer.collaborators),
 974                                    cx,
 975                                );
 976
 977                                let operations = channel_buffer
 978                                    .buffer()
 979                                    .update(cx, |buffer, cx| {
 980                                        let outgoing_operations =
 981                                            buffer.serialize_ops(Some(remote_version), cx);
 982                                        let incoming_operations =
 983                                            mem::take(&mut remote_buffer.operations)
 984                                                .into_iter()
 985                                                .map(language::proto::deserialize_operation)
 986                                                .collect::<Result<Vec<_>>>()?;
 987                                        buffer.apply_ops(incoming_operations, cx);
 988                                        anyhow::Ok(outgoing_operations)
 989                                    })
 990                                    .log_err();
 991
 992                                if let Some(operations) = operations {
 993                                    channel_buffer.connected(cx);
 994                                    let client = this.client.clone();
 995                                    cx.background_spawn(async move {
 996                                        let operations = operations.await;
 997                                        for chunk in language::proto::split_operations(operations) {
 998                                            client
 999                                                .send(proto::UpdateChannelBuffer {
1000                                                    channel_id: channel_id.0,
1001                                                    operations: chunk,
1002                                                })
1003                                                .ok();
1004                                        }
1005                                    })
1006                                    .detach();
1007                                    return true;
1008                                }
1009                            }
1010
1011                            channel_buffer.disconnect(cx);
1012                            false
1013                        })
1014                    }
1015                    OpenEntityHandle::Loading(_) => true,
1016                });
1017            })
1018            .ok();
1019            anyhow::Ok(())
1020        })
1021    }
1022
1023    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1024        cx.notify();
1025        self.did_subscribe = false;
1026        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1027            cx.spawn(async move |this, cx| {
1028                if wait_for_reconnect {
1029                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1030                }
1031
1032                if let Some(this) = this.upgrade() {
1033                    this.update(cx, |this, cx| {
1034                        for (_, buffer) in &this.opened_buffers {
1035                            if let OpenEntityHandle::Open(buffer) = &buffer {
1036                                if let Some(buffer) = buffer.upgrade() {
1037                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1038                                }
1039                            }
1040                        }
1041                    })
1042                    .ok();
1043                }
1044            })
1045        });
1046    }
1047
1048    #[cfg(any(test, feature = "test-support"))]
1049    pub fn reset(&mut self) {
1050        self.channel_invitations.clear();
1051        self.channel_index.clear();
1052        self.channel_participants.clear();
1053        self.outgoing_invites.clear();
1054        self.opened_buffers.clear();
1055        self.opened_chats.clear();
1056        self.disconnect_channel_buffers_task = None;
1057        self.channel_states.clear();
1058    }
1059
1060    pub(crate) fn update_channels(
1061        &mut self,
1062        payload: proto::UpdateChannels,
1063        cx: &mut Context<ChannelStore>,
1064    ) -> Option<Task<Result<()>>> {
1065        if !payload.remove_channel_invitations.is_empty() {
1066            self.channel_invitations
1067                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1068        }
1069        for channel in payload.channel_invitations {
1070            match self
1071                .channel_invitations
1072                .binary_search_by_key(&channel.id, |c| c.id.0)
1073            {
1074                Ok(ix) => {
1075                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1076                }
1077                Err(ix) => self.channel_invitations.insert(
1078                    ix,
1079                    Arc::new(Channel {
1080                        id: ChannelId(channel.id),
1081                        visibility: channel.visibility(),
1082                        name: channel.name.into(),
1083                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1084                        channel_order: channel.channel_order,
1085                    }),
1086                ),
1087            }
1088        }
1089
1090        let channels_changed = !payload.channels.is_empty()
1091            || !payload.delete_channels.is_empty()
1092            || !payload.latest_channel_message_ids.is_empty()
1093            || !payload.latest_channel_buffer_versions.is_empty();
1094
1095        if channels_changed {
1096            if !payload.delete_channels.is_empty() {
1097                let delete_channels: Vec<ChannelId> =
1098                    payload.delete_channels.into_iter().map(ChannelId).collect();
1099                self.channel_index.delete_channels(&delete_channels);
1100                self.channel_participants
1101                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1102
1103                for channel_id in &delete_channels {
1104                    let channel_id = *channel_id;
1105                    if payload
1106                        .channels
1107                        .iter()
1108                        .any(|channel| channel.id == channel_id.0)
1109                    {
1110                        continue;
1111                    }
1112                    if let Some(OpenEntityHandle::Open(buffer)) =
1113                        self.opened_buffers.remove(&channel_id)
1114                    {
1115                        if let Some(buffer) = buffer.upgrade() {
1116                            buffer.update(cx, ChannelBuffer::disconnect);
1117                        }
1118                    }
1119                }
1120            }
1121
1122            let mut index = self.channel_index.bulk_insert();
1123            for channel in payload.channels {
1124                let id = ChannelId(channel.id);
1125                let channel_changed = index.insert(channel);
1126
1127                if channel_changed {
1128                    if let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1129                        if let Some(buffer) = buffer.upgrade() {
1130                            buffer.update(cx, ChannelBuffer::channel_changed);
1131                        }
1132                    }
1133                }
1134            }
1135
1136            for latest_buffer_version in payload.latest_channel_buffer_versions {
1137                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1138                self.channel_states
1139                    .entry(ChannelId(latest_buffer_version.channel_id))
1140                    .or_default()
1141                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1142            }
1143
1144            for latest_channel_message in payload.latest_channel_message_ids {
1145                self.channel_states
1146                    .entry(ChannelId(latest_channel_message.channel_id))
1147                    .or_default()
1148                    .update_latest_message_id(latest_channel_message.message_id);
1149            }
1150        }
1151
1152        cx.notify();
1153        if payload.channel_participants.is_empty() {
1154            return None;
1155        }
1156
1157        let mut all_user_ids = Vec::new();
1158        let channel_participants = payload.channel_participants;
1159        for entry in &channel_participants {
1160            for user_id in entry.participant_user_ids.iter() {
1161                if let Err(ix) = all_user_ids.binary_search(user_id) {
1162                    all_user_ids.insert(ix, *user_id);
1163                }
1164            }
1165        }
1166
1167        let users = self
1168            .user_store
1169            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1170        Some(cx.spawn(async move |this, cx| {
1171            let users = users.await?;
1172
1173            this.update(cx, |this, cx| {
1174                for entry in &channel_participants {
1175                    let mut participants: Vec<_> = entry
1176                        .participant_user_ids
1177                        .iter()
1178                        .filter_map(|user_id| {
1179                            users
1180                                .binary_search_by_key(&user_id, |user| &user.id)
1181                                .ok()
1182                                .map(|ix| users[ix].clone())
1183                        })
1184                        .collect();
1185
1186                    participants.sort_by_key(|u| u.id);
1187
1188                    this.channel_participants
1189                        .insert(ChannelId(entry.channel_id), participants);
1190                }
1191
1192                cx.notify();
1193            })
1194        }))
1195    }
1196}
1197
1198impl ChannelState {
1199    fn set_role(&mut self, role: ChannelRole) {
1200        self.role = Some(role);
1201    }
1202
1203    fn has_channel_buffer_changed(&self) -> bool {
1204        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1205            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1206                && self
1207                    .latest_notes_version
1208                    .version
1209                    .changed_since(&self.observed_notes_version.version))
1210    }
1211
1212    fn has_new_messages(&self) -> bool {
1213        let latest_message_id = self.latest_chat_message;
1214        let observed_message_id = self.observed_chat_message;
1215
1216        latest_message_id.is_some_and(|latest_message_id| {
1217            latest_message_id > observed_message_id.unwrap_or_default()
1218        })
1219    }
1220
1221    fn last_acknowledged_message_id(&self) -> Option<u64> {
1222        self.observed_chat_message
1223    }
1224
1225    fn acknowledge_message_id(&mut self, message_id: u64) {
1226        let observed = self.observed_chat_message.get_or_insert(message_id);
1227        *observed = (*observed).max(message_id);
1228    }
1229
1230    fn update_latest_message_id(&mut self, message_id: u64) {
1231        self.latest_chat_message =
1232            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1233    }
1234
1235    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1236        if self.observed_notes_version.epoch == epoch {
1237            self.observed_notes_version.version.join(version);
1238        } else {
1239            self.observed_notes_version = NotesVersion {
1240                epoch,
1241                version: version.clone(),
1242            };
1243        }
1244    }
1245
1246    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1247        if self.latest_notes_version.epoch == epoch {
1248            self.latest_notes_version.version.join(version);
1249        } else {
1250            self.latest_notes_version = NotesVersion {
1251                epoch,
1252                version: version.clone(),
1253            };
1254        }
1255    }
1256}