channel_store.rs

   1mod channel_index;
   2
   3use crate::{ChannelMessage, channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
   4use anyhow::{Result, anyhow};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
   7use collections::{HashMap, HashSet, hash_map};
   8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
   9use gpui::{
  10    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
  11    WeakEntity,
  12};
  13use language::Capability;
  14use rpc::{
  15    TypedEnvelope,
  16    proto::{self, ChannelRole, ChannelVisibility},
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{ResultExt, maybe};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
  25    let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  26    cx.set_global(GlobalChannelStore(channel_store));
  27}
  28
  29#[derive(Debug, Clone, Default, PartialEq)]
  30struct NotesVersion {
  31    epoch: u64,
  32    version: clock::Global,
  33}
  34
  35pub struct ChannelStore {
  36    pub channel_index: ChannelIndex,
  37    channel_invitations: Vec<Arc<Channel>>,
  38    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  39    channel_states: HashMap<ChannelId, ChannelState>,
  40    outgoing_invites: HashSet<(ChannelId, UserId)>,
  41    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  42    opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
  43    opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
  44    client: Arc<Client>,
  45    did_subscribe: bool,
  46    user_store: Entity<UserStore>,
  47    _rpc_subscriptions: [Subscription; 2],
  48    _watch_connection_status: Task<Option<()>>,
  49    disconnect_channel_buffers_task: Option<Task<()>>,
  50    _update_channels: Task<()>,
  51}
  52
  53#[derive(Clone, Debug)]
  54pub struct Channel {
  55    pub id: ChannelId,
  56    pub name: SharedString,
  57    pub visibility: proto::ChannelVisibility,
  58    pub parent_path: Vec<ChannelId>,
  59}
  60
  61#[derive(Default, Debug)]
  62pub struct ChannelState {
  63    latest_chat_message: Option<u64>,
  64    latest_notes_version: NotesVersion,
  65    observed_notes_version: NotesVersion,
  66    observed_chat_message: Option<u64>,
  67    role: Option<ChannelRole>,
  68}
  69
  70impl Channel {
  71    pub fn link(&self, cx: &App) -> String {
  72        format!(
  73            "{}/channel/{}-{}",
  74            ClientSettings::get_global(cx).server_url,
  75            Self::slug(&self.name),
  76            self.id
  77        )
  78    }
  79
  80    pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
  81        self.link(cx)
  82            + "/notes"
  83            + &heading
  84                .map(|h| format!("#{}", Self::slug(&h)))
  85                .unwrap_or_default()
  86    }
  87
  88    pub fn is_root_channel(&self) -> bool {
  89        self.parent_path.is_empty()
  90    }
  91
  92    pub fn root_id(&self) -> ChannelId {
  93        self.parent_path.first().copied().unwrap_or(self.id)
  94    }
  95
  96    pub fn slug(str: &str) -> String {
  97        let slug: String = str
  98            .chars()
  99            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 100            .collect();
 101
 102        slug.trim_matches(|c| c == '-').to_string()
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub struct ChannelMembership {
 108    pub user: Arc<User>,
 109    pub kind: proto::channel_member::Kind,
 110    pub role: proto::ChannelRole,
 111}
 112impl ChannelMembership {
 113    pub fn sort_key(&self) -> MembershipSortKey {
 114        MembershipSortKey {
 115            role_order: match self.role {
 116                proto::ChannelRole::Admin => 0,
 117                proto::ChannelRole::Member => 1,
 118                proto::ChannelRole::Banned => 2,
 119                proto::ChannelRole::Talker => 3,
 120                proto::ChannelRole::Guest => 4,
 121            },
 122            kind_order: match self.kind {
 123                proto::channel_member::Kind::Member => 0,
 124                proto::channel_member::Kind::Invitee => 1,
 125            },
 126            username_order: self.user.github_login.as_str(),
 127        }
 128    }
 129}
 130
 131#[derive(PartialOrd, Ord, PartialEq, Eq)]
 132pub struct MembershipSortKey<'a> {
 133    role_order: u8,
 134    kind_order: u8,
 135    username_order: &'a str,
 136}
 137
 138pub enum ChannelEvent {
 139    ChannelCreated(ChannelId),
 140    ChannelRenamed(ChannelId),
 141}
 142
 143impl EventEmitter<ChannelEvent> for ChannelStore {}
 144
 145enum OpenEntityHandle<E> {
 146    Open(WeakEntity<E>),
 147    Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
 148}
 149
 150struct GlobalChannelStore(Entity<ChannelStore>);
 151
 152impl Global for GlobalChannelStore {}
 153
 154impl ChannelStore {
 155    pub fn global(cx: &App) -> Entity<Self> {
 156        cx.global::<GlobalChannelStore>().0.clone()
 157    }
 158
 159    pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 160        let rpc_subscriptions = [
 161            client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
 162            client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
 163        ];
 164
 165        let mut connection_status = client.status();
 166        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 167        let watch_connection_status = cx.spawn(async move |this, cx| {
 168            while let Some(status) = connection_status.next().await {
 169                let this = this.upgrade()?;
 170                match status {
 171                    client::Status::Connected { .. } => {
 172                        this.update(cx, |this, cx| this.handle_connect(cx))
 173                            .ok()?
 174                            .await
 175                            .log_err()?;
 176                    }
 177                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 178                        this.update(cx, |this, cx| this.handle_disconnect(false, cx))
 179                            .ok();
 180                    }
 181                    _ => {
 182                        this.update(cx, |this, cx| this.handle_disconnect(true, cx))
 183                            .ok();
 184                    }
 185                }
 186            }
 187            Some(())
 188        });
 189
 190        Self {
 191            channel_invitations: Vec::default(),
 192            channel_index: ChannelIndex::default(),
 193            channel_participants: Default::default(),
 194            outgoing_invites: Default::default(),
 195            opened_buffers: Default::default(),
 196            opened_chats: Default::default(),
 197            update_channels_tx,
 198            client,
 199            user_store,
 200            _rpc_subscriptions: rpc_subscriptions,
 201            _watch_connection_status: watch_connection_status,
 202            disconnect_channel_buffers_task: None,
 203            _update_channels: cx.spawn(async move |this, cx| {
 204                maybe!(async move {
 205                    while let Some(update_channels) = update_channels_rx.next().await {
 206                        if let Some(this) = this.upgrade() {
 207                            let update_task = this
 208                                .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
 209                            if let Some(update_task) = update_task {
 210                                update_task.await.log_err();
 211                            }
 212                        }
 213                    }
 214                    anyhow::Ok(())
 215                })
 216                .await
 217                .log_err();
 218            }),
 219            channel_states: Default::default(),
 220            did_subscribe: false,
 221        }
 222    }
 223
 224    pub fn initialize(&mut self) {
 225        if !self.did_subscribe
 226            && self
 227                .client
 228                .send(proto::SubscribeToChannels {})
 229                .log_err()
 230                .is_some()
 231        {
 232            self.did_subscribe = true;
 233        }
 234    }
 235
 236    pub fn client(&self) -> Arc<Client> {
 237        self.client.clone()
 238    }
 239
 240    /// Returns the number of unique channels in the store
 241    pub fn channel_count(&self) -> usize {
 242        self.channel_index.by_id().len()
 243    }
 244
 245    /// Returns the index of a channel ID in the list of unique channels
 246    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 247        self.channel_index
 248            .by_id()
 249            .keys()
 250            .position(|id| *id == channel_id)
 251    }
 252
 253    /// Returns an iterator over all unique channels
 254    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 255        self.channel_index.by_id().values()
 256    }
 257
 258    /// Iterate over all entries in the channel DAG
 259    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 260        self.channel_index
 261            .ordered_channels()
 262            .iter()
 263            .filter_map(move |id| {
 264                let channel = self.channel_index.by_id().get(id)?;
 265                Some((channel.parent_path.len(), channel))
 266            })
 267    }
 268
 269    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 270        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 271        self.channel_index.by_id().get(channel_id)
 272    }
 273
 274    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 275        self.channel_index.by_id().values().nth(ix)
 276    }
 277
 278    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 279        self.channel_invitations
 280            .iter()
 281            .any(|channel| channel.id == channel_id)
 282    }
 283
 284    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 285        &self.channel_invitations
 286    }
 287
 288    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 289        self.channel_index.by_id().get(&channel_id)
 290    }
 291
 292    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
 293        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 294            if let OpenEntityHandle::Open(buffer) = buffer {
 295                return buffer.upgrade().is_some();
 296            }
 297        }
 298        false
 299    }
 300
 301    pub fn open_channel_buffer(
 302        &mut self,
 303        channel_id: ChannelId,
 304        cx: &mut Context<Self>,
 305    ) -> Task<Result<Entity<ChannelBuffer>>> {
 306        let client = self.client.clone();
 307        let user_store = self.user_store.clone();
 308        let channel_store = cx.entity();
 309        self.open_channel_resource(
 310            channel_id,
 311            |this| &mut this.opened_buffers,
 312            async move |channel, cx| {
 313                ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
 314            },
 315            cx,
 316        )
 317    }
 318
 319    pub fn fetch_channel_messages(
 320        &self,
 321        message_ids: Vec<u64>,
 322        cx: &mut Context<Self>,
 323    ) -> Task<Result<Vec<ChannelMessage>>> {
 324        let request = if message_ids.is_empty() {
 325            None
 326        } else {
 327            Some(
 328                self.client
 329                    .request(proto::GetChannelMessagesById { message_ids }),
 330            )
 331        };
 332        cx.spawn(async move |this, cx| {
 333            if let Some(request) = request {
 334                let response = request.await?;
 335                let this = this
 336                    .upgrade()
 337                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 338                let user_store = this.update(cx, |this, _| this.user_store.clone())?;
 339                ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
 340            } else {
 341                Ok(Vec::new())
 342            }
 343        })
 344    }
 345
 346    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 347        self.channel_states
 348            .get(&channel_id)
 349            .is_some_and(|state| state.has_channel_buffer_changed())
 350    }
 351
 352    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 353        self.channel_states
 354            .get(&channel_id)
 355            .is_some_and(|state| state.has_new_messages())
 356    }
 357
 358    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 359        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 360            state.latest_chat_message = message_id;
 361        }
 362    }
 363
 364    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 365        self.channel_states.get(&channel_id).and_then(|state| {
 366            if let Some(last_message_id) = state.latest_chat_message {
 367                if state
 368                    .last_acknowledged_message_id()
 369                    .is_some_and(|id| id < last_message_id)
 370                {
 371                    return state.last_acknowledged_message_id();
 372                }
 373            }
 374
 375            None
 376        })
 377    }
 378
 379    pub fn acknowledge_message_id(
 380        &mut self,
 381        channel_id: ChannelId,
 382        message_id: u64,
 383        cx: &mut Context<Self>,
 384    ) {
 385        self.channel_states
 386            .entry(channel_id)
 387            .or_default()
 388            .acknowledge_message_id(message_id);
 389        cx.notify();
 390    }
 391
 392    pub fn update_latest_message_id(
 393        &mut self,
 394        channel_id: ChannelId,
 395        message_id: u64,
 396        cx: &mut Context<Self>,
 397    ) {
 398        self.channel_states
 399            .entry(channel_id)
 400            .or_default()
 401            .update_latest_message_id(message_id);
 402        cx.notify();
 403    }
 404
 405    pub fn acknowledge_notes_version(
 406        &mut self,
 407        channel_id: ChannelId,
 408        epoch: u64,
 409        version: &clock::Global,
 410        cx: &mut Context<Self>,
 411    ) {
 412        self.channel_states
 413            .entry(channel_id)
 414            .or_default()
 415            .acknowledge_notes_version(epoch, version);
 416        cx.notify()
 417    }
 418
 419    pub fn update_latest_notes_version(
 420        &mut self,
 421        channel_id: ChannelId,
 422        epoch: u64,
 423        version: &clock::Global,
 424        cx: &mut Context<Self>,
 425    ) {
 426        self.channel_states
 427            .entry(channel_id)
 428            .or_default()
 429            .update_latest_notes_version(epoch, version);
 430        cx.notify()
 431    }
 432
 433    pub fn open_channel_chat(
 434        &mut self,
 435        channel_id: ChannelId,
 436        cx: &mut Context<Self>,
 437    ) -> Task<Result<Entity<ChannelChat>>> {
 438        let client = self.client.clone();
 439        let user_store = self.user_store.clone();
 440        let this = cx.entity();
 441        self.open_channel_resource(
 442            channel_id,
 443            |this| &mut this.opened_chats,
 444            async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
 445            cx,
 446        )
 447    }
 448
 449    /// Asynchronously open a given resource associated with a channel.
 450    ///
 451    /// Make sure that the resource is only opened once, even if this method
 452    /// is called multiple times with the same channel id while the first task
 453    /// is still running.
 454    fn open_channel_resource<T, F>(
 455        &mut self,
 456        channel_id: ChannelId,
 457        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
 458        load: F,
 459        cx: &mut Context<Self>,
 460    ) -> Task<Result<Entity<T>>>
 461    where
 462        F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
 463        T: 'static,
 464    {
 465        let task = loop {
 466            match get_map(self).entry(channel_id) {
 467                hash_map::Entry::Occupied(e) => match e.get() {
 468                    OpenEntityHandle::Open(entity) => {
 469                        if let Some(entity) = entity.upgrade() {
 470                            break Task::ready(Ok(entity)).shared();
 471                        } else {
 472                            get_map(self).remove(&channel_id);
 473                            continue;
 474                        }
 475                    }
 476                    OpenEntityHandle::Loading(task) => {
 477                        break task.clone();
 478                    }
 479                },
 480                hash_map::Entry::Vacant(e) => {
 481                    let task = cx
 482                        .spawn(async move |this, cx| {
 483                            let channel = this.update(cx, |this, _| {
 484                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 485                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 486                                })
 487                            })??;
 488
 489                            load(channel, cx).await.map_err(Arc::new)
 490                        })
 491                        .shared();
 492
 493                    e.insert(OpenEntityHandle::Loading(task.clone()));
 494                    cx.spawn({
 495                        let task = task.clone();
 496                        async move |this, cx| {
 497                            let result = task.await;
 498                            this.update(cx, |this, _| match result {
 499                                Ok(model) => {
 500                                    get_map(this).insert(
 501                                        channel_id,
 502                                        OpenEntityHandle::Open(model.downgrade()),
 503                                    );
 504                                }
 505                                Err(_) => {
 506                                    get_map(this).remove(&channel_id);
 507                                }
 508                            })
 509                            .ok();
 510                        }
 511                    })
 512                    .detach();
 513                    break task;
 514                }
 515            }
 516        };
 517        cx.background_spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 518    }
 519
 520    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 521        self.channel_role(channel_id) == proto::ChannelRole::Admin
 522    }
 523
 524    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 525        self.channel_index
 526            .by_id()
 527            .get(&channel_id)
 528            .map_or(false, |channel| channel.is_root_channel())
 529    }
 530
 531    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 532        self.channel_index
 533            .by_id()
 534            .get(&channel_id)
 535            .map_or(false, |channel| {
 536                channel.visibility == ChannelVisibility::Public
 537            })
 538    }
 539
 540    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 541        match self.channel_role(channel_id) {
 542            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 543            _ => Capability::ReadOnly,
 544        }
 545    }
 546
 547    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 548        maybe!({
 549            let mut channel = self.channel_for_id(channel_id)?;
 550            if !channel.is_root_channel() {
 551                channel = self.channel_for_id(channel.root_id())?;
 552            }
 553            let root_channel_state = self.channel_states.get(&channel.id);
 554            root_channel_state?.role
 555        })
 556        .unwrap_or(proto::ChannelRole::Guest)
 557    }
 558
 559    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 560        self.channel_participants
 561            .get(&channel_id)
 562            .map_or(&[], |v| v.as_slice())
 563    }
 564
 565    pub fn create_channel(
 566        &self,
 567        name: &str,
 568        parent_id: Option<ChannelId>,
 569        cx: &mut Context<Self>,
 570    ) -> Task<Result<ChannelId>> {
 571        let client = self.client.clone();
 572        let name = name.trim_start_matches('#').to_owned();
 573        cx.spawn(async move |this, cx| {
 574            let response = client
 575                .request(proto::CreateChannel {
 576                    name,
 577                    parent_id: parent_id.map(|cid| cid.0),
 578                })
 579                .await?;
 580
 581            let channel = response
 582                .channel
 583                .ok_or_else(|| anyhow!("missing channel in response"))?;
 584            let channel_id = ChannelId(channel.id);
 585
 586            this.update(cx, |this, cx| {
 587                let task = this.update_channels(
 588                    proto::UpdateChannels {
 589                        channels: vec![channel],
 590                        ..Default::default()
 591                    },
 592                    cx,
 593                );
 594                assert!(task.is_none());
 595
 596                // This event is emitted because the collab panel wants to clear the pending edit state
 597                // before this frame is rendered. But we can't guarantee that the collab panel's future
 598                // will resolve before this flush_effects finishes. Synchronously emitting this event
 599                // ensures that the collab panel will observe this creation before the frame completes
 600                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 601            })?;
 602
 603            Ok(channel_id)
 604        })
 605    }
 606
 607    pub fn move_channel(
 608        &mut self,
 609        channel_id: ChannelId,
 610        to: ChannelId,
 611        cx: &mut Context<Self>,
 612    ) -> Task<Result<()>> {
 613        let client = self.client.clone();
 614        cx.spawn(async move |_, _| {
 615            let _ = client
 616                .request(proto::MoveChannel {
 617                    channel_id: channel_id.0,
 618                    to: to.0,
 619                })
 620                .await?;
 621
 622            Ok(())
 623        })
 624    }
 625
 626    pub fn set_channel_visibility(
 627        &mut self,
 628        channel_id: ChannelId,
 629        visibility: ChannelVisibility,
 630        cx: &mut Context<Self>,
 631    ) -> Task<Result<()>> {
 632        let client = self.client.clone();
 633        cx.spawn(async move |_, _| {
 634            let _ = client
 635                .request(proto::SetChannelVisibility {
 636                    channel_id: channel_id.0,
 637                    visibility: visibility.into(),
 638                })
 639                .await?;
 640
 641            Ok(())
 642        })
 643    }
 644
 645    pub fn invite_member(
 646        &mut self,
 647        channel_id: ChannelId,
 648        user_id: UserId,
 649        role: proto::ChannelRole,
 650        cx: &mut Context<Self>,
 651    ) -> Task<Result<()>> {
 652        if !self.outgoing_invites.insert((channel_id, user_id)) {
 653            return Task::ready(Err(anyhow!("invite request already in progress")));
 654        }
 655
 656        cx.notify();
 657        let client = self.client.clone();
 658        cx.spawn(async move |this, cx| {
 659            let result = client
 660                .request(proto::InviteChannelMember {
 661                    channel_id: channel_id.0,
 662                    user_id,
 663                    role: role.into(),
 664                })
 665                .await;
 666
 667            this.update(cx, |this, cx| {
 668                this.outgoing_invites.remove(&(channel_id, user_id));
 669                cx.notify();
 670            })?;
 671
 672            result?;
 673
 674            Ok(())
 675        })
 676    }
 677
 678    pub fn remove_member(
 679        &mut self,
 680        channel_id: ChannelId,
 681        user_id: u64,
 682        cx: &mut Context<Self>,
 683    ) -> Task<Result<()>> {
 684        if !self.outgoing_invites.insert((channel_id, user_id)) {
 685            return Task::ready(Err(anyhow!("invite request already in progress")));
 686        }
 687
 688        cx.notify();
 689        let client = self.client.clone();
 690        cx.spawn(async move |this, cx| {
 691            let result = client
 692                .request(proto::RemoveChannelMember {
 693                    channel_id: channel_id.0,
 694                    user_id,
 695                })
 696                .await;
 697
 698            this.update(cx, |this, cx| {
 699                this.outgoing_invites.remove(&(channel_id, user_id));
 700                cx.notify();
 701            })?;
 702            result?;
 703            Ok(())
 704        })
 705    }
 706
 707    pub fn set_member_role(
 708        &mut self,
 709        channel_id: ChannelId,
 710        user_id: UserId,
 711        role: proto::ChannelRole,
 712        cx: &mut Context<Self>,
 713    ) -> Task<Result<()>> {
 714        if !self.outgoing_invites.insert((channel_id, user_id)) {
 715            return Task::ready(Err(anyhow!("member request already in progress")));
 716        }
 717
 718        cx.notify();
 719        let client = self.client.clone();
 720        cx.spawn(async move |this, cx| {
 721            let result = client
 722                .request(proto::SetChannelMemberRole {
 723                    channel_id: channel_id.0,
 724                    user_id,
 725                    role: role.into(),
 726                })
 727                .await;
 728
 729            this.update(cx, |this, cx| {
 730                this.outgoing_invites.remove(&(channel_id, user_id));
 731                cx.notify();
 732            })?;
 733
 734            result?;
 735            Ok(())
 736        })
 737    }
 738
 739    pub fn rename(
 740        &mut self,
 741        channel_id: ChannelId,
 742        new_name: &str,
 743        cx: &mut Context<Self>,
 744    ) -> Task<Result<()>> {
 745        let client = self.client.clone();
 746        let name = new_name.to_string();
 747        cx.spawn(async move |this, cx| {
 748            let channel = client
 749                .request(proto::RenameChannel {
 750                    channel_id: channel_id.0,
 751                    name,
 752                })
 753                .await?
 754                .channel
 755                .ok_or_else(|| anyhow!("missing channel in response"))?;
 756            this.update(cx, |this, cx| {
 757                let task = this.update_channels(
 758                    proto::UpdateChannels {
 759                        channels: vec![channel],
 760                        ..Default::default()
 761                    },
 762                    cx,
 763                );
 764                assert!(task.is_none());
 765
 766                // This event is emitted because the collab panel wants to clear the pending edit state
 767                // before this frame is rendered. But we can't guarantee that the collab panel's future
 768                // will resolve before this flush_effects finishes. Synchronously emitting this event
 769                // ensures that the collab panel will observe this creation before the frame complete
 770                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 771            })?;
 772            Ok(())
 773        })
 774    }
 775
 776    pub fn respond_to_channel_invite(
 777        &mut self,
 778        channel_id: ChannelId,
 779        accept: bool,
 780        cx: &mut Context<Self>,
 781    ) -> Task<Result<()>> {
 782        let client = self.client.clone();
 783        cx.background_spawn(async move {
 784            client
 785                .request(proto::RespondToChannelInvite {
 786                    channel_id: channel_id.0,
 787                    accept,
 788                })
 789                .await?;
 790            Ok(())
 791        })
 792    }
 793    pub fn fuzzy_search_members(
 794        &self,
 795        channel_id: ChannelId,
 796        query: String,
 797        limit: u16,
 798        cx: &mut Context<Self>,
 799    ) -> Task<Result<Vec<ChannelMembership>>> {
 800        let client = self.client.clone();
 801        let user_store = self.user_store.downgrade();
 802        cx.spawn(async move |_, cx| {
 803            let response = client
 804                .request(proto::GetChannelMembers {
 805                    channel_id: channel_id.0,
 806                    query,
 807                    limit: limit as u64,
 808                })
 809                .await?;
 810            user_store.update(cx, |user_store, _| {
 811                user_store.insert(response.users);
 812                response
 813                    .members
 814                    .into_iter()
 815                    .filter_map(|member| {
 816                        Some(ChannelMembership {
 817                            user: user_store.get_cached_user(member.user_id)?,
 818                            role: member.role(),
 819                            kind: member.kind(),
 820                        })
 821                    })
 822                    .collect()
 823            })
 824        })
 825    }
 826
 827    pub fn remove_channel(
 828        &self,
 829        channel_id: ChannelId,
 830    ) -> impl Future<Output = Result<()>> + use<> {
 831        let client = self.client.clone();
 832        async move {
 833            client
 834                .request(proto::DeleteChannel {
 835                    channel_id: channel_id.0,
 836                })
 837                .await?;
 838            Ok(())
 839        }
 840    }
 841
 842    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 843        false
 844    }
 845
 846    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 847        self.outgoing_invites.contains(&(channel_id, user_id))
 848    }
 849
 850    async fn handle_update_channels(
 851        this: Entity<Self>,
 852        message: TypedEnvelope<proto::UpdateChannels>,
 853        mut cx: AsyncApp,
 854    ) -> Result<()> {
 855        this.update(&mut cx, |this, _| {
 856            this.update_channels_tx
 857                .unbounded_send(message.payload)
 858                .unwrap();
 859        })?;
 860        Ok(())
 861    }
 862
 863    async fn handle_update_user_channels(
 864        this: Entity<Self>,
 865        message: TypedEnvelope<proto::UpdateUserChannels>,
 866        mut cx: AsyncApp,
 867    ) -> Result<()> {
 868        this.update(&mut cx, |this, cx| {
 869            for buffer_version in message.payload.observed_channel_buffer_version {
 870                let version = language::proto::deserialize_version(&buffer_version.version);
 871                this.acknowledge_notes_version(
 872                    ChannelId(buffer_version.channel_id),
 873                    buffer_version.epoch,
 874                    &version,
 875                    cx,
 876                );
 877            }
 878            for message_id in message.payload.observed_channel_message_id {
 879                this.acknowledge_message_id(
 880                    ChannelId(message_id.channel_id),
 881                    message_id.message_id,
 882                    cx,
 883                );
 884            }
 885            for membership in message.payload.channel_memberships {
 886                if let Some(role) = ChannelRole::from_i32(membership.role) {
 887                    this.channel_states
 888                        .entry(ChannelId(membership.channel_id))
 889                        .or_default()
 890                        .set_role(role)
 891                }
 892            }
 893        })
 894    }
 895
 896    fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 897        self.channel_index.clear();
 898        self.channel_invitations.clear();
 899        self.channel_participants.clear();
 900        self.channel_index.clear();
 901        self.outgoing_invites.clear();
 902        self.disconnect_channel_buffers_task.take();
 903
 904        for chat in self.opened_chats.values() {
 905            if let OpenEntityHandle::Open(chat) = chat {
 906                if let Some(chat) = chat.upgrade() {
 907                    chat.update(cx, |chat, cx| {
 908                        chat.rejoin(cx);
 909                    });
 910                }
 911            }
 912        }
 913
 914        let mut buffer_versions = Vec::new();
 915        for buffer in self.opened_buffers.values() {
 916            if let OpenEntityHandle::Open(buffer) = buffer {
 917                if let Some(buffer) = buffer.upgrade() {
 918                    let channel_buffer = buffer.read(cx);
 919                    let buffer = channel_buffer.buffer().read(cx);
 920                    buffer_versions.push(proto::ChannelBufferVersion {
 921                        channel_id: channel_buffer.channel_id.0,
 922                        epoch: channel_buffer.epoch(),
 923                        version: language::proto::serialize_version(&buffer.version()),
 924                    });
 925                }
 926            }
 927        }
 928
 929        if buffer_versions.is_empty() {
 930            return Task::ready(Ok(()));
 931        }
 932
 933        let response = self.client.request(proto::RejoinChannelBuffers {
 934            buffers: buffer_versions,
 935        });
 936
 937        cx.spawn(async move |this, cx| {
 938            let mut response = response.await?;
 939
 940            this.update(cx, |this, cx| {
 941                this.opened_buffers.retain(|_, buffer| match buffer {
 942                    OpenEntityHandle::Open(channel_buffer) => {
 943                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 944                            return false;
 945                        };
 946
 947                        channel_buffer.update(cx, |channel_buffer, cx| {
 948                            let channel_id = channel_buffer.channel_id;
 949                            if let Some(remote_buffer) = response
 950                                .buffers
 951                                .iter_mut()
 952                                .find(|buffer| buffer.channel_id == channel_id.0)
 953                            {
 954                                let channel_id = channel_buffer.channel_id;
 955                                let remote_version =
 956                                    language::proto::deserialize_version(&remote_buffer.version);
 957
 958                                channel_buffer.replace_collaborators(
 959                                    mem::take(&mut remote_buffer.collaborators),
 960                                    cx,
 961                                );
 962
 963                                let operations = channel_buffer
 964                                    .buffer()
 965                                    .update(cx, |buffer, cx| {
 966                                        let outgoing_operations =
 967                                            buffer.serialize_ops(Some(remote_version), cx);
 968                                        let incoming_operations =
 969                                            mem::take(&mut remote_buffer.operations)
 970                                                .into_iter()
 971                                                .map(language::proto::deserialize_operation)
 972                                                .collect::<Result<Vec<_>>>()?;
 973                                        buffer.apply_ops(incoming_operations, cx);
 974                                        anyhow::Ok(outgoing_operations)
 975                                    })
 976                                    .log_err();
 977
 978                                if let Some(operations) = operations {
 979                                    let client = this.client.clone();
 980                                    cx.background_spawn(async move {
 981                                        let operations = operations.await;
 982                                        for chunk in language::proto::split_operations(operations) {
 983                                            client
 984                                                .send(proto::UpdateChannelBuffer {
 985                                                    channel_id: channel_id.0,
 986                                                    operations: chunk,
 987                                                })
 988                                                .ok();
 989                                        }
 990                                    })
 991                                    .detach();
 992                                    return true;
 993                                }
 994                            }
 995
 996                            channel_buffer.disconnect(cx);
 997                            false
 998                        })
 999                    }
1000                    OpenEntityHandle::Loading(_) => true,
1001                });
1002            })
1003            .ok();
1004            anyhow::Ok(())
1005        })
1006    }
1007
1008    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1009        cx.notify();
1010        self.did_subscribe = false;
1011        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1012            cx.spawn(async move |this, cx| {
1013                if wait_for_reconnect {
1014                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1015                }
1016
1017                if let Some(this) = this.upgrade() {
1018                    this.update(cx, |this, cx| {
1019                        for (_, buffer) in this.opened_buffers.drain() {
1020                            if let OpenEntityHandle::Open(buffer) = buffer {
1021                                if let Some(buffer) = buffer.upgrade() {
1022                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1023                                }
1024                            }
1025                        }
1026                    })
1027                    .ok();
1028                }
1029            })
1030        });
1031    }
1032
1033    pub(crate) fn update_channels(
1034        &mut self,
1035        payload: proto::UpdateChannels,
1036        cx: &mut Context<ChannelStore>,
1037    ) -> Option<Task<Result<()>>> {
1038        if !payload.remove_channel_invitations.is_empty() {
1039            self.channel_invitations
1040                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1041        }
1042        for channel in payload.channel_invitations {
1043            match self
1044                .channel_invitations
1045                .binary_search_by_key(&channel.id, |c| c.id.0)
1046            {
1047                Ok(ix) => {
1048                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1049                }
1050                Err(ix) => self.channel_invitations.insert(
1051                    ix,
1052                    Arc::new(Channel {
1053                        id: ChannelId(channel.id),
1054                        visibility: channel.visibility(),
1055                        name: channel.name.into(),
1056                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1057                    }),
1058                ),
1059            }
1060        }
1061
1062        let channels_changed = !payload.channels.is_empty()
1063            || !payload.delete_channels.is_empty()
1064            || !payload.latest_channel_message_ids.is_empty()
1065            || !payload.latest_channel_buffer_versions.is_empty();
1066
1067        if channels_changed {
1068            if !payload.delete_channels.is_empty() {
1069                let delete_channels: Vec<ChannelId> =
1070                    payload.delete_channels.into_iter().map(ChannelId).collect();
1071                self.channel_index.delete_channels(&delete_channels);
1072                self.channel_participants
1073                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1074
1075                for channel_id in &delete_channels {
1076                    let channel_id = *channel_id;
1077                    if payload
1078                        .channels
1079                        .iter()
1080                        .any(|channel| channel.id == channel_id.0)
1081                    {
1082                        continue;
1083                    }
1084                    if let Some(OpenEntityHandle::Open(buffer)) =
1085                        self.opened_buffers.remove(&channel_id)
1086                    {
1087                        if let Some(buffer) = buffer.upgrade() {
1088                            buffer.update(cx, ChannelBuffer::disconnect);
1089                        }
1090                    }
1091                }
1092            }
1093
1094            let mut index = self.channel_index.bulk_insert();
1095            for channel in payload.channels {
1096                let id = ChannelId(channel.id);
1097                let channel_changed = index.insert(channel);
1098
1099                if channel_changed {
1100                    if let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1101                        if let Some(buffer) = buffer.upgrade() {
1102                            buffer.update(cx, ChannelBuffer::channel_changed);
1103                        }
1104                    }
1105                }
1106            }
1107
1108            for latest_buffer_version in payload.latest_channel_buffer_versions {
1109                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1110                self.channel_states
1111                    .entry(ChannelId(latest_buffer_version.channel_id))
1112                    .or_default()
1113                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1114            }
1115
1116            for latest_channel_message in payload.latest_channel_message_ids {
1117                self.channel_states
1118                    .entry(ChannelId(latest_channel_message.channel_id))
1119                    .or_default()
1120                    .update_latest_message_id(latest_channel_message.message_id);
1121            }
1122        }
1123
1124        cx.notify();
1125        if payload.channel_participants.is_empty() {
1126            return None;
1127        }
1128
1129        let mut all_user_ids = Vec::new();
1130        let channel_participants = payload.channel_participants;
1131        for entry in &channel_participants {
1132            for user_id in entry.participant_user_ids.iter() {
1133                if let Err(ix) = all_user_ids.binary_search(user_id) {
1134                    all_user_ids.insert(ix, *user_id);
1135                }
1136            }
1137        }
1138
1139        let users = self
1140            .user_store
1141            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1142        Some(cx.spawn(async move |this, cx| {
1143            let users = users.await?;
1144
1145            this.update(cx, |this, cx| {
1146                for entry in &channel_participants {
1147                    let mut participants: Vec<_> = entry
1148                        .participant_user_ids
1149                        .iter()
1150                        .filter_map(|user_id| {
1151                            users
1152                                .binary_search_by_key(&user_id, |user| &user.id)
1153                                .ok()
1154                                .map(|ix| users[ix].clone())
1155                        })
1156                        .collect();
1157
1158                    participants.sort_by_key(|u| u.id);
1159
1160                    this.channel_participants
1161                        .insert(ChannelId(entry.channel_id), participants);
1162                }
1163
1164                cx.notify();
1165            })
1166        }))
1167    }
1168}
1169
1170impl ChannelState {
1171    fn set_role(&mut self, role: ChannelRole) {
1172        self.role = Some(role);
1173    }
1174
1175    fn has_channel_buffer_changed(&self) -> bool {
1176        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1177            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1178                && self
1179                    .latest_notes_version
1180                    .version
1181                    .changed_since(&self.observed_notes_version.version))
1182    }
1183
1184    fn has_new_messages(&self) -> bool {
1185        let latest_message_id = self.latest_chat_message;
1186        let observed_message_id = self.observed_chat_message;
1187
1188        latest_message_id.is_some_and(|latest_message_id| {
1189            latest_message_id > observed_message_id.unwrap_or_default()
1190        })
1191    }
1192
1193    fn last_acknowledged_message_id(&self) -> Option<u64> {
1194        self.observed_chat_message
1195    }
1196
1197    fn acknowledge_message_id(&mut self, message_id: u64) {
1198        let observed = self.observed_chat_message.get_or_insert(message_id);
1199        *observed = (*observed).max(message_id);
1200    }
1201
1202    fn update_latest_message_id(&mut self, message_id: u64) {
1203        self.latest_chat_message =
1204            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1205    }
1206
1207    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1208        if self.observed_notes_version.epoch == epoch {
1209            self.observed_notes_version.version.join(version);
1210        } else {
1211            self.observed_notes_version = NotesVersion {
1212                epoch,
1213                version: version.clone(),
1214            };
1215        }
1216    }
1217
1218    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1219        if self.latest_notes_version.epoch == epoch {
1220            self.latest_notes_version.version.join(version);
1221        } else {
1222            self.latest_notes_version = NotesVersion {
1223                epoch,
1224                version: version.clone(),
1225            };
1226        }
1227    }
1228}