channel_store.rs

   1mod channel_index;
   2
   3use crate::channel_buffer::ChannelBuffer;
   4use anyhow::{Context as _, Result, anyhow};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
   7use collections::{HashMap, HashSet};
   8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
   9use gpui::{
  10    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
  11    WeakEntity,
  12};
  13use language::Capability;
  14use postage::{sink::Sink, watch};
  15use rpc::{
  16    TypedEnvelope,
  17    proto::{self, ChannelRole, ChannelVisibility},
  18};
  19use settings::Settings;
  20use std::{mem, sync::Arc, time::Duration};
  21use util::{ResultExt, maybe};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
  26    let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36pub struct ChannelStore {
  37    pub channel_index: ChannelIndex,
  38    channel_invitations: Vec<Arc<Channel>>,
  39    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  40    channel_states: HashMap<ChannelId, ChannelState>,
  41    outgoing_invites: HashSet<(ChannelId, UserId)>,
  42    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  43    opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
  44    client: Arc<Client>,
  45    did_subscribe: bool,
  46    channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
  47    user_store: Entity<UserStore>,
  48    _rpc_subscriptions: [Subscription; 2],
  49    _watch_connection_status: Task<Option<()>>,
  50    disconnect_channel_buffers_task: Option<Task<()>>,
  51    _update_channels: Task<()>,
  52}
  53
  54#[derive(Clone, Debug)]
  55pub struct Channel {
  56    pub id: ChannelId,
  57    pub name: SharedString,
  58    pub visibility: proto::ChannelVisibility,
  59    pub parent_path: Vec<ChannelId>,
  60    pub channel_order: i32,
  61}
  62
  63#[derive(Default, Debug)]
  64pub struct ChannelState {
  65    latest_notes_version: NotesVersion,
  66    observed_notes_version: NotesVersion,
  67    role: Option<ChannelRole>,
  68}
  69
  70impl Channel {
  71    pub fn link(&self, cx: &App) -> String {
  72        format!(
  73            "{}/channel/{}-{}",
  74            ClientSettings::get_global(cx).server_url,
  75            Self::slug(&self.name),
  76            self.id
  77        )
  78    }
  79
  80    pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
  81        self.link(cx)
  82            + "/notes"
  83            + &heading
  84                .map(|h| format!("#{}", Self::slug(&h)))
  85                .unwrap_or_default()
  86    }
  87
  88    pub fn is_root_channel(&self) -> bool {
  89        self.parent_path.is_empty()
  90    }
  91
  92    pub fn root_id(&self) -> ChannelId {
  93        self.parent_path.first().copied().unwrap_or(self.id)
  94    }
  95
  96    pub fn slug(str: &str) -> String {
  97        let slug: String = str
  98            .chars()
  99            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 100            .collect();
 101
 102        slug.trim_matches(|c| c == '-').to_string()
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub struct ChannelMembership {
 108    pub user: Arc<User>,
 109    pub kind: proto::channel_member::Kind,
 110    pub role: proto::ChannelRole,
 111}
 112impl ChannelMembership {
 113    pub fn sort_key(&self) -> MembershipSortKey<'_> {
 114        MembershipSortKey {
 115            role_order: match self.role {
 116                proto::ChannelRole::Admin => 0,
 117                proto::ChannelRole::Member => 1,
 118                proto::ChannelRole::Banned => 2,
 119                proto::ChannelRole::Talker => 3,
 120                proto::ChannelRole::Guest => 4,
 121            },
 122            kind_order: match self.kind {
 123                proto::channel_member::Kind::Member => 0,
 124                proto::channel_member::Kind::Invitee => 1,
 125            },
 126            username_order: &self.user.github_login,
 127        }
 128    }
 129}
 130
 131#[derive(PartialOrd, Ord, PartialEq, Eq)]
 132pub struct MembershipSortKey<'a> {
 133    role_order: u8,
 134    kind_order: u8,
 135    username_order: &'a str,
 136}
 137
 138pub enum ChannelEvent {
 139    ChannelCreated(ChannelId),
 140    ChannelRenamed(ChannelId),
 141}
 142
 143impl EventEmitter<ChannelEvent> for ChannelStore {}
 144
 145enum OpenEntityHandle<E> {
 146    Open(WeakEntity<E>),
 147    Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
 148}
 149
 150struct GlobalChannelStore(Entity<ChannelStore>);
 151
 152impl Global for GlobalChannelStore {}
 153
 154impl ChannelStore {
 155    pub fn global(cx: &App) -> Entity<Self> {
 156        cx.global::<GlobalChannelStore>().0.clone()
 157    }
 158
 159    pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 160        let rpc_subscriptions = [
 161            client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
 162            client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
 163        ];
 164
 165        let mut connection_status = client.status();
 166        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 167        let watch_connection_status = cx.spawn(async move |this, cx| {
 168            while let Some(status) = connection_status.next().await {
 169                let this = this.upgrade()?;
 170                match status {
 171                    client::Status::Connected { .. } => {
 172                        this.update(cx, |this, cx| this.handle_connect(cx))
 173                            .await
 174                            .log_err()?;
 175                    }
 176                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 177                        this.update(cx, |this, cx| this.handle_disconnect(false, cx));
 178                    }
 179                    _ => {
 180                        this.update(cx, |this, cx| this.handle_disconnect(true, cx));
 181                    }
 182                }
 183            }
 184            Some(())
 185        });
 186
 187        Self {
 188            channel_invitations: Vec::default(),
 189            channel_index: ChannelIndex::default(),
 190            channel_participants: Default::default(),
 191            outgoing_invites: Default::default(),
 192            opened_buffers: Default::default(),
 193            update_channels_tx,
 194            client,
 195            user_store,
 196            _rpc_subscriptions: rpc_subscriptions,
 197            _watch_connection_status: watch_connection_status,
 198            disconnect_channel_buffers_task: None,
 199            _update_channels: cx.spawn(async move |this, cx| {
 200                maybe!(async move {
 201                    while let Some(update_channels) = update_channels_rx.next().await {
 202                        if let Some(this) = this.upgrade() {
 203                            let update_task = this
 204                                .update(cx, |this, cx| this.update_channels(update_channels, cx));
 205                            if let Some(update_task) = update_task {
 206                                update_task.await.log_err();
 207                            }
 208                        }
 209                    }
 210                    anyhow::Ok(())
 211                })
 212                .await
 213                .log_err();
 214            }),
 215            channel_states: Default::default(),
 216            did_subscribe: false,
 217            channels_loaded: watch::channel_with(false),
 218        }
 219    }
 220
 221    pub fn initialize(&mut self) {
 222        if !self.did_subscribe
 223            && self
 224                .client
 225                .send(proto::SubscribeToChannels {})
 226                .log_err()
 227                .is_some()
 228        {
 229            self.did_subscribe = true;
 230        }
 231    }
 232
 233    pub fn wait_for_channels(
 234        &mut self,
 235        timeout: Duration,
 236        cx: &mut Context<Self>,
 237    ) -> Task<Result<()>> {
 238        let mut channels_loaded_rx = self.channels_loaded.1.clone();
 239        if *channels_loaded_rx.borrow() {
 240            return Task::ready(Ok(()));
 241        }
 242
 243        let mut status_receiver = self.client.status();
 244        if status_receiver.borrow().is_connected() {
 245            self.initialize();
 246        }
 247
 248        let mut timer = cx.background_executor().timer(timeout).fuse();
 249        cx.spawn(async move |this, cx| {
 250            loop {
 251                futures::select_biased! {
 252                    channels_loaded = channels_loaded_rx.next().fuse() => {
 253                        if let Some(true) = channels_loaded {
 254                            return Ok(());
 255                        }
 256                    }
 257                    status = status_receiver.next().fuse() => {
 258                        if let Some(status) = status
 259                            && status.is_connected() {
 260                                this.update(cx, |this, _cx| {
 261                                    this.initialize();
 262                                }).ok();
 263                            }
 264                        continue;
 265                    }
 266                    _ = timer => {
 267                        return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
 268                    }
 269                }
 270            }
 271        })
 272    }
 273
 274    pub fn client(&self) -> Arc<Client> {
 275        self.client.clone()
 276    }
 277
 278    /// Returns the number of unique channels in the store
 279    pub fn channel_count(&self) -> usize {
 280        self.channel_index.by_id().len()
 281    }
 282
 283    /// Returns the index of a channel ID in the list of unique channels
 284    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 285        self.channel_index
 286            .by_id()
 287            .keys()
 288            .position(|id| *id == channel_id)
 289    }
 290
 291    /// Returns an iterator over all unique channels
 292    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 293        self.channel_index.by_id().values()
 294    }
 295
 296    /// Iterate over all entries in the channel DAG
 297    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 298        self.channel_index
 299            .ordered_channels()
 300            .iter()
 301            .filter_map(move |id| {
 302                let channel = self.channel_index.by_id().get(id)?;
 303                Some((channel.parent_path.len(), channel))
 304            })
 305    }
 306
 307    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 308        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 309        self.channel_index.by_id().get(channel_id)
 310    }
 311
 312    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 313        self.channel_index.by_id().values().nth(ix)
 314    }
 315
 316    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 317        self.channel_invitations
 318            .iter()
 319            .any(|channel| channel.id == channel_id)
 320    }
 321
 322    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 323        &self.channel_invitations
 324    }
 325
 326    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 327        self.channel_index.by_id().get(&channel_id)
 328    }
 329
 330    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
 331        if let Some(buffer) = self.opened_buffers.get(&channel_id)
 332            && let OpenEntityHandle::Open(buffer) = buffer
 333        {
 334            return buffer.upgrade().is_some();
 335        }
 336        false
 337    }
 338
 339    pub fn open_channel_buffer(
 340        &mut self,
 341        channel_id: ChannelId,
 342        cx: &mut Context<Self>,
 343    ) -> Task<Result<Entity<ChannelBuffer>>> {
 344        let client = self.client.clone();
 345        let user_store = self.user_store.clone();
 346        let channel_store = cx.entity();
 347        self.open_channel_resource(
 348            channel_id,
 349            "notes",
 350            |this| &mut this.opened_buffers,
 351            async move |channel, cx| {
 352                ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
 353            },
 354            cx,
 355        )
 356    }
 357
 358    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 359        self.channel_states
 360            .get(&channel_id)
 361            .is_some_and(|state| state.has_channel_buffer_changed())
 362    }
 363
 364    pub fn acknowledge_notes_version(
 365        &mut self,
 366        channel_id: ChannelId,
 367        epoch: u64,
 368        version: &clock::Global,
 369        cx: &mut Context<Self>,
 370    ) {
 371        self.channel_states
 372            .entry(channel_id)
 373            .or_default()
 374            .acknowledge_notes_version(epoch, version);
 375        cx.notify()
 376    }
 377
 378    pub fn update_latest_notes_version(
 379        &mut self,
 380        channel_id: ChannelId,
 381        epoch: u64,
 382        version: &clock::Global,
 383        cx: &mut Context<Self>,
 384    ) {
 385        self.channel_states
 386            .entry(channel_id)
 387            .or_default()
 388            .update_latest_notes_version(epoch, version);
 389        cx.notify()
 390    }
 391
 392    /// Asynchronously open a given resource associated with a channel.
 393    ///
 394    /// Make sure that the resource is only opened once, even if this method
 395    /// is called multiple times with the same channel id while the first task
 396    /// is still running.
 397    fn open_channel_resource<T, F>(
 398        &mut self,
 399        channel_id: ChannelId,
 400        resource_name: &'static str,
 401        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
 402        load: F,
 403        cx: &mut Context<Self>,
 404    ) -> Task<Result<Entity<T>>>
 405    where
 406        F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
 407        T: 'static,
 408    {
 409        let task = loop {
 410            match get_map(self).get(&channel_id) {
 411                Some(OpenEntityHandle::Open(entity)) => {
 412                    if let Some(entity) = entity.upgrade() {
 413                        break Task::ready(Ok(entity)).shared();
 414                    } else {
 415                        get_map(self).remove(&channel_id);
 416                        continue;
 417                    }
 418                }
 419                Some(OpenEntityHandle::Loading(task)) => break task.clone(),
 420                None => {}
 421            }
 422
 423            let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
 424            let task = cx
 425                .spawn(async move |this, cx| {
 426                    channels_ready.await?;
 427                    let channel = this.read_with(cx, |this, _| {
 428                        this.channel_for_id(channel_id)
 429                            .cloned()
 430                            .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
 431                    })??;
 432
 433                    load(channel, cx).await.map_err(Arc::new)
 434                })
 435                .shared();
 436
 437            get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
 438            let task = cx.spawn({
 439                async move |this, cx| {
 440                    let result = task.await;
 441                    this.update(cx, |this, _| match &result {
 442                        Ok(model) => {
 443                            get_map(this)
 444                                .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
 445                        }
 446                        Err(_) => {
 447                            get_map(this).remove(&channel_id);
 448                        }
 449                    })?;
 450                    result
 451                }
 452            });
 453            break task.shared();
 454        };
 455        cx.background_spawn(async move {
 456            task.await.map_err(|error| {
 457                anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
 458            })
 459        })
 460    }
 461
 462    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 463        self.channel_role(channel_id) == proto::ChannelRole::Admin
 464    }
 465
 466    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 467        self.channel_index
 468            .by_id()
 469            .get(&channel_id)
 470            .is_some_and(|channel| channel.is_root_channel())
 471    }
 472
 473    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 474        self.channel_index
 475            .by_id()
 476            .get(&channel_id)
 477            .is_some_and(|channel| channel.visibility == ChannelVisibility::Public)
 478    }
 479
 480    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 481        match self.channel_role(channel_id) {
 482            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 483            _ => Capability::ReadOnly,
 484        }
 485    }
 486
 487    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 488        maybe!({
 489            let mut channel = self.channel_for_id(channel_id)?;
 490            if !channel.is_root_channel() {
 491                channel = self.channel_for_id(channel.root_id())?;
 492            }
 493            let root_channel_state = self.channel_states.get(&channel.id);
 494            root_channel_state?.role
 495        })
 496        .unwrap_or(proto::ChannelRole::Guest)
 497    }
 498
 499    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 500        self.channel_participants
 501            .get(&channel_id)
 502            .map_or(&[], |v| v.as_slice())
 503    }
 504
 505    pub fn create_channel(
 506        &self,
 507        name: &str,
 508        parent_id: Option<ChannelId>,
 509        cx: &mut Context<Self>,
 510    ) -> Task<Result<ChannelId>> {
 511        let client = self.client.clone();
 512        let name = name.trim_start_matches('#').to_owned();
 513        cx.spawn(async move |this, cx| {
 514            let response = client
 515                .request(proto::CreateChannel {
 516                    name,
 517                    parent_id: parent_id.map(|cid| cid.0),
 518                })
 519                .await?;
 520
 521            let channel = response.channel.context("missing channel in response")?;
 522            let channel_id = ChannelId(channel.id);
 523
 524            this.update(cx, |this, cx| {
 525                let task = this.update_channels(
 526                    proto::UpdateChannels {
 527                        channels: vec![channel],
 528                        ..Default::default()
 529                    },
 530                    cx,
 531                );
 532                assert!(task.is_none());
 533
 534                // This event is emitted because the collab panel wants to clear the pending edit state
 535                // before this frame is rendered. But we can't guarantee that the collab panel's future
 536                // will resolve before this flush_effects finishes. Synchronously emitting this event
 537                // ensures that the collab panel will observe this creation before the frame completes
 538                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 539            })?;
 540
 541            Ok(channel_id)
 542        })
 543    }
 544
 545    pub fn move_channel(
 546        &mut self,
 547        channel_id: ChannelId,
 548        to: ChannelId,
 549        cx: &mut Context<Self>,
 550    ) -> Task<Result<()>> {
 551        let client = self.client.clone();
 552        cx.spawn(async move |_, _| {
 553            let _ = client
 554                .request(proto::MoveChannel {
 555                    channel_id: channel_id.0,
 556                    to: to.0,
 557                })
 558                .await?;
 559            Ok(())
 560        })
 561    }
 562
 563    pub fn reorder_channel(
 564        &mut self,
 565        channel_id: ChannelId,
 566        direction: proto::reorder_channel::Direction,
 567        cx: &mut Context<Self>,
 568    ) -> Task<Result<()>> {
 569        let client = self.client.clone();
 570        cx.spawn(async move |_, _| {
 571            client
 572                .request(proto::ReorderChannel {
 573                    channel_id: channel_id.0,
 574                    direction: direction.into(),
 575                })
 576                .await?;
 577            Ok(())
 578        })
 579    }
 580
 581    pub fn set_channel_visibility(
 582        &mut self,
 583        channel_id: ChannelId,
 584        visibility: ChannelVisibility,
 585        cx: &mut Context<Self>,
 586    ) -> Task<Result<()>> {
 587        let client = self.client.clone();
 588        cx.spawn(async move |_, _| {
 589            let _ = client
 590                .request(proto::SetChannelVisibility {
 591                    channel_id: channel_id.0,
 592                    visibility: visibility.into(),
 593                })
 594                .await?;
 595
 596            Ok(())
 597        })
 598    }
 599
 600    pub fn invite_member(
 601        &mut self,
 602        channel_id: ChannelId,
 603        user_id: UserId,
 604        role: proto::ChannelRole,
 605        cx: &mut Context<Self>,
 606    ) -> Task<Result<()>> {
 607        if !self.outgoing_invites.insert((channel_id, user_id)) {
 608            return Task::ready(Err(anyhow!("invite request already in progress")));
 609        }
 610
 611        cx.notify();
 612        let client = self.client.clone();
 613        cx.spawn(async move |this, cx| {
 614            let result = client
 615                .request(proto::InviteChannelMember {
 616                    channel_id: channel_id.0,
 617                    user_id,
 618                    role: role.into(),
 619                })
 620                .await;
 621
 622            this.update(cx, |this, cx| {
 623                this.outgoing_invites.remove(&(channel_id, user_id));
 624                cx.notify();
 625            })?;
 626
 627            result?;
 628
 629            Ok(())
 630        })
 631    }
 632
 633    pub fn remove_member(
 634        &mut self,
 635        channel_id: ChannelId,
 636        user_id: u64,
 637        cx: &mut Context<Self>,
 638    ) -> Task<Result<()>> {
 639        if !self.outgoing_invites.insert((channel_id, user_id)) {
 640            return Task::ready(Err(anyhow!("invite request already in progress")));
 641        }
 642
 643        cx.notify();
 644        let client = self.client.clone();
 645        cx.spawn(async move |this, cx| {
 646            let result = client
 647                .request(proto::RemoveChannelMember {
 648                    channel_id: channel_id.0,
 649                    user_id,
 650                })
 651                .await;
 652
 653            this.update(cx, |this, cx| {
 654                this.outgoing_invites.remove(&(channel_id, user_id));
 655                cx.notify();
 656            })?;
 657            result?;
 658            Ok(())
 659        })
 660    }
 661
 662    pub fn set_member_role(
 663        &mut self,
 664        channel_id: ChannelId,
 665        user_id: UserId,
 666        role: proto::ChannelRole,
 667        cx: &mut Context<Self>,
 668    ) -> Task<Result<()>> {
 669        if !self.outgoing_invites.insert((channel_id, user_id)) {
 670            return Task::ready(Err(anyhow!("member request already in progress")));
 671        }
 672
 673        cx.notify();
 674        let client = self.client.clone();
 675        cx.spawn(async move |this, cx| {
 676            let result = client
 677                .request(proto::SetChannelMemberRole {
 678                    channel_id: channel_id.0,
 679                    user_id,
 680                    role: role.into(),
 681                })
 682                .await;
 683
 684            this.update(cx, |this, cx| {
 685                this.outgoing_invites.remove(&(channel_id, user_id));
 686                cx.notify();
 687            })?;
 688
 689            result?;
 690            Ok(())
 691        })
 692    }
 693
 694    pub fn rename(
 695        &mut self,
 696        channel_id: ChannelId,
 697        new_name: &str,
 698        cx: &mut Context<Self>,
 699    ) -> Task<Result<()>> {
 700        let client = self.client.clone();
 701        let name = new_name.to_string();
 702        cx.spawn(async move |this, cx| {
 703            let channel = client
 704                .request(proto::RenameChannel {
 705                    channel_id: channel_id.0,
 706                    name,
 707                })
 708                .await?
 709                .channel
 710                .context("missing channel in response")?;
 711            this.update(cx, |this, cx| {
 712                let task = this.update_channels(
 713                    proto::UpdateChannels {
 714                        channels: vec![channel],
 715                        ..Default::default()
 716                    },
 717                    cx,
 718                );
 719                assert!(task.is_none());
 720
 721                // This event is emitted because the collab panel wants to clear the pending edit state
 722                // before this frame is rendered. But we can't guarantee that the collab panel's future
 723                // will resolve before this flush_effects finishes. Synchronously emitting this event
 724                // ensures that the collab panel will observe this creation before the frame complete
 725                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 726            })?;
 727            Ok(())
 728        })
 729    }
 730
 731    pub fn respond_to_channel_invite(
 732        &mut self,
 733        channel_id: ChannelId,
 734        accept: bool,
 735        cx: &mut Context<Self>,
 736    ) -> Task<Result<()>> {
 737        let client = self.client.clone();
 738        cx.background_spawn(async move {
 739            client
 740                .request(proto::RespondToChannelInvite {
 741                    channel_id: channel_id.0,
 742                    accept,
 743                })
 744                .await?;
 745            Ok(())
 746        })
 747    }
 748    pub fn fuzzy_search_members(
 749        &self,
 750        channel_id: ChannelId,
 751        query: String,
 752        limit: u16,
 753        cx: &mut Context<Self>,
 754    ) -> Task<Result<Vec<ChannelMembership>>> {
 755        let client = self.client.clone();
 756        let user_store = self.user_store.downgrade();
 757        cx.spawn(async move |_, cx| {
 758            let response = client
 759                .request(proto::GetChannelMembers {
 760                    channel_id: channel_id.0,
 761                    query,
 762                    limit: limit as u64,
 763                })
 764                .await?;
 765            user_store.update(cx, |user_store, _| {
 766                user_store.insert(response.users);
 767                response
 768                    .members
 769                    .into_iter()
 770                    .filter_map(|member| {
 771                        Some(ChannelMembership {
 772                            user: user_store.get_cached_user(member.user_id)?,
 773                            role: member.role(),
 774                            kind: member.kind(),
 775                        })
 776                    })
 777                    .collect()
 778            })
 779        })
 780    }
 781
 782    pub fn remove_channel(
 783        &self,
 784        channel_id: ChannelId,
 785    ) -> impl Future<Output = Result<()>> + use<> {
 786        let client = self.client.clone();
 787        async move {
 788            client
 789                .request(proto::DeleteChannel {
 790                    channel_id: channel_id.0,
 791                })
 792                .await?;
 793            Ok(())
 794        }
 795    }
 796
 797    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 798        false
 799    }
 800
 801    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 802        self.outgoing_invites.contains(&(channel_id, user_id))
 803    }
 804
 805    async fn handle_update_channels(
 806        this: Entity<Self>,
 807        message: TypedEnvelope<proto::UpdateChannels>,
 808        cx: AsyncApp,
 809    ) -> Result<()> {
 810        this.read_with(&cx, |this, _| {
 811            this.update_channels_tx
 812                .unbounded_send(message.payload)
 813                .unwrap();
 814        });
 815        Ok(())
 816    }
 817
 818    async fn handle_update_user_channels(
 819        this: Entity<Self>,
 820        message: TypedEnvelope<proto::UpdateUserChannels>,
 821        mut cx: AsyncApp,
 822    ) -> Result<()> {
 823        this.update(&mut cx, |this, cx| {
 824            for buffer_version in message.payload.observed_channel_buffer_version {
 825                let version = language::proto::deserialize_version(&buffer_version.version);
 826                this.acknowledge_notes_version(
 827                    ChannelId(buffer_version.channel_id),
 828                    buffer_version.epoch,
 829                    &version,
 830                    cx,
 831                );
 832            }
 833            for membership in message.payload.channel_memberships {
 834                if let Some(role) = ChannelRole::from_i32(membership.role) {
 835                    this.channel_states
 836                        .entry(ChannelId(membership.channel_id))
 837                        .or_default()
 838                        .set_role(role)
 839                }
 840            }
 841        });
 842        Ok(())
 843    }
 844
 845    fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 846        self.channel_index.clear();
 847        self.channel_invitations.clear();
 848        self.channel_participants.clear();
 849        self.channel_index.clear();
 850        self.outgoing_invites.clear();
 851        self.disconnect_channel_buffers_task.take();
 852
 853        let mut buffer_versions = Vec::new();
 854        for buffer in self.opened_buffers.values() {
 855            if let OpenEntityHandle::Open(buffer) = buffer
 856                && let Some(buffer) = buffer.upgrade()
 857            {
 858                let channel_buffer = buffer.read(cx);
 859                let buffer = channel_buffer.buffer().read(cx);
 860                buffer_versions.push(proto::ChannelBufferVersion {
 861                    channel_id: channel_buffer.channel_id.0,
 862                    epoch: channel_buffer.epoch(),
 863                    version: language::proto::serialize_version(&buffer.version()),
 864                });
 865            }
 866        }
 867
 868        if buffer_versions.is_empty() {
 869            return Task::ready(Ok(()));
 870        }
 871
 872        let response = self.client.request(proto::RejoinChannelBuffers {
 873            buffers: buffer_versions,
 874        });
 875
 876        cx.spawn(async move |this, cx| {
 877            let mut response = response.await?;
 878
 879            this.update(cx, |this, cx| {
 880                this.opened_buffers.retain(|_, buffer| match buffer {
 881                    OpenEntityHandle::Open(channel_buffer) => {
 882                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 883                            return false;
 884                        };
 885
 886                        channel_buffer.update(cx, |channel_buffer, cx| {
 887                            let channel_id = channel_buffer.channel_id;
 888                            if let Some(remote_buffer) = response
 889                                .buffers
 890                                .iter_mut()
 891                                .find(|buffer| buffer.channel_id == channel_id.0)
 892                            {
 893                                let channel_id = channel_buffer.channel_id;
 894                                let remote_version =
 895                                    language::proto::deserialize_version(&remote_buffer.version);
 896
 897                                channel_buffer.replace_collaborators(
 898                                    mem::take(&mut remote_buffer.collaborators),
 899                                    cx,
 900                                );
 901
 902                                let operations = channel_buffer
 903                                    .buffer()
 904                                    .update(cx, |buffer, cx| {
 905                                        let outgoing_operations =
 906                                            buffer.serialize_ops(Some(remote_version), cx);
 907                                        let incoming_operations =
 908                                            mem::take(&mut remote_buffer.operations)
 909                                                .into_iter()
 910                                                .map(language::proto::deserialize_operation)
 911                                                .collect::<Result<Vec<_>>>()?;
 912                                        buffer.apply_ops(incoming_operations, cx);
 913                                        anyhow::Ok(outgoing_operations)
 914                                    })
 915                                    .log_err();
 916
 917                                if let Some(operations) = operations {
 918                                    channel_buffer.connected(cx);
 919                                    let client = this.client.clone();
 920                                    cx.background_spawn(async move {
 921                                        let operations = operations.await;
 922                                        for chunk in language::proto::split_operations(operations) {
 923                                            client
 924                                                .send(proto::UpdateChannelBuffer {
 925                                                    channel_id: channel_id.0,
 926                                                    operations: chunk,
 927                                                })
 928                                                .ok();
 929                                        }
 930                                    })
 931                                    .detach();
 932                                    return true;
 933                                }
 934                            }
 935
 936                            channel_buffer.disconnect(cx);
 937                            false
 938                        })
 939                    }
 940                    OpenEntityHandle::Loading(_) => true,
 941                });
 942            })
 943            .ok();
 944            anyhow::Ok(())
 945        })
 946    }
 947
 948    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
 949        cx.notify();
 950        self.did_subscribe = false;
 951        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 952            cx.spawn(async move |this, cx| {
 953                if wait_for_reconnect {
 954                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
 955                }
 956
 957                if let Some(this) = this.upgrade() {
 958                    this.update(cx, |this, cx| {
 959                        for buffer in this.opened_buffers.values() {
 960                            if let OpenEntityHandle::Open(buffer) = &buffer
 961                                && let Some(buffer) = buffer.upgrade()
 962                            {
 963                                buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 964                            }
 965                        }
 966                    });
 967                }
 968            })
 969        });
 970    }
 971
 972    #[cfg(any(test, feature = "test-support"))]
 973    pub fn reset(&mut self) {
 974        self.channel_invitations.clear();
 975        self.channel_index.clear();
 976        self.channel_participants.clear();
 977        self.outgoing_invites.clear();
 978        self.opened_buffers.clear();
 979        self.disconnect_channel_buffers_task = None;
 980        self.channel_states.clear();
 981    }
 982
 983    pub(crate) fn update_channels(
 984        &mut self,
 985        payload: proto::UpdateChannels,
 986        cx: &mut Context<ChannelStore>,
 987    ) -> Option<Task<Result<()>>> {
 988        if !payload.remove_channel_invitations.is_empty() {
 989            self.channel_invitations
 990                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
 991        }
 992        for channel in payload.channel_invitations {
 993            match self
 994                .channel_invitations
 995                .binary_search_by_key(&channel.id, |c| c.id.0)
 996            {
 997                Ok(ix) => {
 998                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
 999                }
1000                Err(ix) => self.channel_invitations.insert(
1001                    ix,
1002                    Arc::new(Channel {
1003                        id: ChannelId(channel.id),
1004                        visibility: channel.visibility(),
1005                        name: channel.name.into(),
1006                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1007                        channel_order: channel.channel_order,
1008                    }),
1009                ),
1010            }
1011        }
1012
1013        let channels_changed = !payload.channels.is_empty()
1014            || !payload.delete_channels.is_empty()
1015            || !payload.latest_channel_buffer_versions.is_empty();
1016
1017        if channels_changed {
1018            if !payload.delete_channels.is_empty() {
1019                let delete_channels: Vec<ChannelId> =
1020                    payload.delete_channels.into_iter().map(ChannelId).collect();
1021                self.channel_index.delete_channels(&delete_channels);
1022                self.channel_participants
1023                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1024
1025                for channel_id in &delete_channels {
1026                    let channel_id = *channel_id;
1027                    if payload
1028                        .channels
1029                        .iter()
1030                        .any(|channel| channel.id == channel_id.0)
1031                    {
1032                        continue;
1033                    }
1034                    if let Some(OpenEntityHandle::Open(buffer)) =
1035                        self.opened_buffers.remove(&channel_id)
1036                        && let Some(buffer) = buffer.upgrade()
1037                    {
1038                        buffer.update(cx, ChannelBuffer::disconnect);
1039                    }
1040                }
1041            }
1042
1043            let mut index = self.channel_index.bulk_insert();
1044            for channel in payload.channels {
1045                let id = ChannelId(channel.id);
1046                let channel_changed = index.insert(channel);
1047
1048                if channel_changed
1049                    && let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id)
1050                    && let Some(buffer) = buffer.upgrade()
1051                {
1052                    buffer.update(cx, ChannelBuffer::channel_changed);
1053                }
1054            }
1055
1056            for latest_buffer_version in payload.latest_channel_buffer_versions {
1057                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1058                self.channel_states
1059                    .entry(ChannelId(latest_buffer_version.channel_id))
1060                    .or_default()
1061                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1062            }
1063
1064            self.channels_loaded.0.try_send(true).log_err();
1065        }
1066
1067        cx.notify();
1068        if payload.channel_participants.is_empty() {
1069            return None;
1070        }
1071
1072        let mut all_user_ids = Vec::new();
1073        let channel_participants = payload.channel_participants;
1074        for entry in &channel_participants {
1075            for user_id in entry.participant_user_ids.iter() {
1076                if let Err(ix) = all_user_ids.binary_search(user_id) {
1077                    all_user_ids.insert(ix, *user_id);
1078                }
1079            }
1080        }
1081
1082        let users = self
1083            .user_store
1084            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1085        Some(cx.spawn(async move |this, cx| {
1086            let users = users.await?;
1087
1088            this.update(cx, |this, cx| {
1089                for entry in &channel_participants {
1090                    let mut participants: Vec<_> = entry
1091                        .participant_user_ids
1092                        .iter()
1093                        .filter_map(|user_id| {
1094                            users
1095                                .binary_search_by_key(&user_id, |user| &user.id)
1096                                .ok()
1097                                .map(|ix| users[ix].clone())
1098                        })
1099                        .collect();
1100
1101                    participants.sort_by_key(|u| u.id);
1102
1103                    this.channel_participants
1104                        .insert(ChannelId(entry.channel_id), participants);
1105                }
1106
1107                cx.notify();
1108            })
1109        }))
1110    }
1111}
1112
1113impl ChannelState {
1114    fn set_role(&mut self, role: ChannelRole) {
1115        self.role = Some(role);
1116    }
1117
1118    fn has_channel_buffer_changed(&self) -> bool {
1119        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1120            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1121                && self
1122                    .latest_notes_version
1123                    .version
1124                    .changed_since(&self.observed_notes_version.version))
1125    }
1126
1127    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1128        if self.observed_notes_version.epoch == epoch {
1129            self.observed_notes_version.version.join(version);
1130        } else {
1131            self.observed_notes_version = NotesVersion {
1132                epoch,
1133                version: version.clone(),
1134            };
1135        }
1136    }
1137
1138    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1139        if self.latest_notes_version.epoch == epoch {
1140            self.latest_notes_version.version.join(version);
1141        } else {
1142            self.latest_notes_version = NotesVersion {
1143                epoch,
1144                version: version.clone(),
1145            };
1146        }
1147    }
1148}