channel_store.rs

   1mod channel_index;
   2
   3use crate::channel_buffer::ChannelBuffer;
   4use anyhow::{Context as _, Result, anyhow};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
   7use collections::{HashMap, HashSet};
   8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
   9use gpui::{
  10    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
  11    WeakEntity,
  12};
  13use language::Capability;
  14use postage::{sink::Sink, watch};
  15use rpc::{
  16    TypedEnvelope,
  17    proto::{self, ChannelRole, ChannelVisibility},
  18};
  19use settings::Settings;
  20use std::{mem, sync::Arc, time::Duration};
  21use util::{ResultExt, maybe};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
  26    let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36pub struct ChannelStore {
  37    pub channel_index: ChannelIndex,
  38    channel_invitations: Vec<Arc<Channel>>,
  39    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  40    channel_states: HashMap<ChannelId, ChannelState>,
  41    outgoing_invites: HashSet<(ChannelId, UserId)>,
  42    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  43    opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
  44    client: Arc<Client>,
  45    did_subscribe: bool,
  46    channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
  47    user_store: Entity<UserStore>,
  48    _rpc_subscriptions: [Subscription; 2],
  49    _watch_connection_status: Task<Option<()>>,
  50    disconnect_channel_buffers_task: Option<Task<()>>,
  51    _update_channels: Task<()>,
  52}
  53
  54#[derive(Clone, Debug)]
  55pub struct Channel {
  56    pub id: ChannelId,
  57    pub name: SharedString,
  58    pub visibility: proto::ChannelVisibility,
  59    pub parent_path: Vec<ChannelId>,
  60    pub channel_order: i32,
  61}
  62
  63#[derive(Default, Debug)]
  64pub struct ChannelState {
  65    latest_notes_version: NotesVersion,
  66    observed_notes_version: NotesVersion,
  67    role: Option<ChannelRole>,
  68}
  69
  70impl Channel {
  71    pub fn link(&self, cx: &App) -> String {
  72        format!(
  73            "{}/channel/{}-{}",
  74            ClientSettings::get_global(cx).server_url,
  75            Self::slug(&self.name),
  76            self.id
  77        )
  78    }
  79
  80    pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
  81        self.link(cx)
  82            + "/notes"
  83            + &heading
  84                .map(|h| format!("#{}", Self::slug(&h)))
  85                .unwrap_or_default()
  86    }
  87
  88    pub fn is_root_channel(&self) -> bool {
  89        self.parent_path.is_empty()
  90    }
  91
  92    pub fn root_id(&self) -> ChannelId {
  93        self.parent_path.first().copied().unwrap_or(self.id)
  94    }
  95
  96    pub fn slug(str: &str) -> String {
  97        let slug: String = str
  98            .chars()
  99            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 100            .collect();
 101
 102        slug.trim_matches(|c| c == '-').to_string()
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub struct ChannelMembership {
 108    pub user: Arc<User>,
 109    pub kind: proto::channel_member::Kind,
 110    pub role: proto::ChannelRole,
 111}
 112impl ChannelMembership {
 113    pub fn sort_key(&self) -> MembershipSortKey<'_> {
 114        MembershipSortKey {
 115            role_order: match self.role {
 116                proto::ChannelRole::Admin => 0,
 117                proto::ChannelRole::Member => 1,
 118                proto::ChannelRole::Banned => 2,
 119                proto::ChannelRole::Talker => 3,
 120                proto::ChannelRole::Guest => 4,
 121            },
 122            kind_order: match self.kind {
 123                proto::channel_member::Kind::Member => 0,
 124                proto::channel_member::Kind::Invitee => 1,
 125            },
 126            username_order: &self.user.github_login,
 127        }
 128    }
 129}
 130
 131#[derive(PartialOrd, Ord, PartialEq, Eq)]
 132pub struct MembershipSortKey<'a> {
 133    role_order: u8,
 134    kind_order: u8,
 135    username_order: &'a str,
 136}
 137
 138pub enum ChannelEvent {
 139    ChannelCreated(ChannelId),
 140    ChannelRenamed(ChannelId),
 141}
 142
 143impl EventEmitter<ChannelEvent> for ChannelStore {}
 144
 145enum OpenEntityHandle<E> {
 146    Open(WeakEntity<E>),
 147    Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
 148}
 149
 150struct GlobalChannelStore(Entity<ChannelStore>);
 151
 152impl Global for GlobalChannelStore {}
 153
 154impl ChannelStore {
 155    pub fn global(cx: &App) -> Entity<Self> {
 156        cx.global::<GlobalChannelStore>().0.clone()
 157    }
 158
 159    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 160        cx.try_global::<GlobalChannelStore>().map(|g| g.0.clone())
 161    }
 162
 163    pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 164        let rpc_subscriptions = [
 165            client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
 166            client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
 167        ];
 168
 169        let mut connection_status = client.status();
 170        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 171        let watch_connection_status = cx.spawn(async move |this, cx| {
 172            while let Some(status) = connection_status.next().await {
 173                let this = this.upgrade()?;
 174                match status {
 175                    client::Status::Connected { .. } => {
 176                        this.update(cx, |this, cx| this.handle_connect(cx))
 177                            .await
 178                            .log_err()?;
 179                    }
 180                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 181                        this.update(cx, |this, cx| this.handle_disconnect(false, cx));
 182                    }
 183                    _ => {
 184                        this.update(cx, |this, cx| this.handle_disconnect(true, cx));
 185                    }
 186                }
 187            }
 188            Some(())
 189        });
 190
 191        Self {
 192            channel_invitations: Vec::default(),
 193            channel_index: ChannelIndex::default(),
 194            channel_participants: Default::default(),
 195            outgoing_invites: Default::default(),
 196            opened_buffers: Default::default(),
 197            update_channels_tx,
 198            client,
 199            user_store,
 200            _rpc_subscriptions: rpc_subscriptions,
 201            _watch_connection_status: watch_connection_status,
 202            disconnect_channel_buffers_task: None,
 203            _update_channels: cx.spawn(async move |this, cx| {
 204                maybe!(async move {
 205                    while let Some(update_channels) = update_channels_rx.next().await {
 206                        if let Some(this) = this.upgrade() {
 207                            let update_task = this
 208                                .update(cx, |this, cx| this.update_channels(update_channels, cx));
 209                            if let Some(update_task) = update_task {
 210                                update_task.await.log_err();
 211                            }
 212                        }
 213                    }
 214                    anyhow::Ok(())
 215                })
 216                .await
 217                .log_err();
 218            }),
 219            channel_states: Default::default(),
 220            did_subscribe: false,
 221            channels_loaded: watch::channel_with(false),
 222        }
 223    }
 224
 225    pub fn initialize(&mut self) {
 226        if !self.did_subscribe
 227            && self
 228                .client
 229                .send(proto::SubscribeToChannels {})
 230                .log_err()
 231                .is_some()
 232        {
 233            self.did_subscribe = true;
 234        }
 235    }
 236
 237    pub fn wait_for_channels(
 238        &mut self,
 239        timeout: Duration,
 240        cx: &mut Context<Self>,
 241    ) -> Task<Result<()>> {
 242        let mut channels_loaded_rx = self.channels_loaded.1.clone();
 243        if *channels_loaded_rx.borrow() {
 244            return Task::ready(Ok(()));
 245        }
 246
 247        let mut status_receiver = self.client.status();
 248        if status_receiver.borrow().is_connected() {
 249            self.initialize();
 250        }
 251
 252        let mut timer = cx.background_executor().timer(timeout).fuse();
 253        cx.spawn(async move |this, cx| {
 254            loop {
 255                futures::select_biased! {
 256                    channels_loaded = channels_loaded_rx.next().fuse() => {
 257                        if let Some(true) = channels_loaded {
 258                            return Ok(());
 259                        }
 260                    }
 261                    status = status_receiver.next().fuse() => {
 262                        if let Some(status) = status
 263                            && status.is_connected() {
 264                                this.update(cx, |this, _cx| {
 265                                    this.initialize();
 266                                }).ok();
 267                            }
 268                        continue;
 269                    }
 270                    _ = timer => {
 271                        return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
 272                    }
 273                }
 274            }
 275        })
 276    }
 277
 278    pub fn client(&self) -> Arc<Client> {
 279        self.client.clone()
 280    }
 281
 282    /// Returns the number of unique channels in the store
 283    pub fn channel_count(&self) -> usize {
 284        self.channel_index.by_id().len()
 285    }
 286
 287    /// Returns the index of a channel ID in the list of unique channels
 288    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 289        self.channel_index
 290            .by_id()
 291            .keys()
 292            .position(|id| *id == channel_id)
 293    }
 294
 295    /// Returns an iterator over all unique channels
 296    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 297        self.channel_index.by_id().values()
 298    }
 299
 300    /// Iterate over all entries in the channel DAG
 301    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 302        self.channel_index
 303            .ordered_channels()
 304            .iter()
 305            .filter_map(move |id| {
 306                let channel = self.channel_index.by_id().get(id)?;
 307                Some((channel.parent_path.len(), channel))
 308            })
 309    }
 310
 311    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 312        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 313        self.channel_index.by_id().get(channel_id)
 314    }
 315
 316    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 317        self.channel_index.by_id().values().nth(ix)
 318    }
 319
 320    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 321        self.channel_invitations
 322            .iter()
 323            .any(|channel| channel.id == channel_id)
 324    }
 325
 326    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 327        &self.channel_invitations
 328    }
 329
 330    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 331        self.channel_index.by_id().get(&channel_id)
 332    }
 333
 334    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
 335        if let Some(buffer) = self.opened_buffers.get(&channel_id)
 336            && let OpenEntityHandle::Open(buffer) = buffer
 337        {
 338            return buffer.upgrade().is_some();
 339        }
 340        false
 341    }
 342
 343    pub fn open_channel_buffer(
 344        &mut self,
 345        channel_id: ChannelId,
 346        cx: &mut Context<Self>,
 347    ) -> Task<Result<Entity<ChannelBuffer>>> {
 348        let client = self.client.clone();
 349        let user_store = self.user_store.clone();
 350        let channel_store = cx.entity();
 351        self.open_channel_resource(
 352            channel_id,
 353            "notes",
 354            |this| &mut this.opened_buffers,
 355            async move |channel, cx| {
 356                ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
 357            },
 358            cx,
 359        )
 360    }
 361
 362    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 363        self.channel_states
 364            .get(&channel_id)
 365            .is_some_and(|state| state.has_channel_buffer_changed())
 366    }
 367
 368    pub fn acknowledge_notes_version(
 369        &mut self,
 370        channel_id: ChannelId,
 371        epoch: u64,
 372        version: &clock::Global,
 373        cx: &mut Context<Self>,
 374    ) {
 375        self.channel_states
 376            .entry(channel_id)
 377            .or_default()
 378            .acknowledge_notes_version(epoch, version);
 379        cx.notify()
 380    }
 381
 382    pub fn update_latest_notes_version(
 383        &mut self,
 384        channel_id: ChannelId,
 385        epoch: u64,
 386        version: &clock::Global,
 387        cx: &mut Context<Self>,
 388    ) {
 389        self.channel_states
 390            .entry(channel_id)
 391            .or_default()
 392            .update_latest_notes_version(epoch, version);
 393        cx.notify()
 394    }
 395
 396    /// Asynchronously open a given resource associated with a channel.
 397    ///
 398    /// Make sure that the resource is only opened once, even if this method
 399    /// is called multiple times with the same channel id while the first task
 400    /// is still running.
 401    fn open_channel_resource<T, F>(
 402        &mut self,
 403        channel_id: ChannelId,
 404        resource_name: &'static str,
 405        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
 406        load: F,
 407        cx: &mut Context<Self>,
 408    ) -> Task<Result<Entity<T>>>
 409    where
 410        F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
 411        T: 'static,
 412    {
 413        let task = loop {
 414            match get_map(self).get(&channel_id) {
 415                Some(OpenEntityHandle::Open(entity)) => {
 416                    if let Some(entity) = entity.upgrade() {
 417                        break Task::ready(Ok(entity)).shared();
 418                    } else {
 419                        get_map(self).remove(&channel_id);
 420                        continue;
 421                    }
 422                }
 423                Some(OpenEntityHandle::Loading(task)) => break task.clone(),
 424                None => {}
 425            }
 426
 427            let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
 428            let task = cx
 429                .spawn(async move |this, cx| {
 430                    channels_ready.await?;
 431                    let channel = this.read_with(cx, |this, _| {
 432                        this.channel_for_id(channel_id)
 433                            .cloned()
 434                            .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
 435                    })??;
 436
 437                    load(channel, cx).await.map_err(Arc::new)
 438                })
 439                .shared();
 440
 441            get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
 442            let task = cx.spawn({
 443                async move |this, cx| {
 444                    let result = task.await;
 445                    this.update(cx, |this, _| match &result {
 446                        Ok(model) => {
 447                            get_map(this)
 448                                .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
 449                        }
 450                        Err(_) => {
 451                            get_map(this).remove(&channel_id);
 452                        }
 453                    })?;
 454                    result
 455                }
 456            });
 457            break task.shared();
 458        };
 459        cx.background_spawn(async move {
 460            task.await.map_err(|error| {
 461                anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
 462            })
 463        })
 464    }
 465
 466    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 467        self.channel_role(channel_id) == proto::ChannelRole::Admin
 468    }
 469
 470    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 471        self.channel_index
 472            .by_id()
 473            .get(&channel_id)
 474            .is_some_and(|channel| channel.is_root_channel())
 475    }
 476
 477    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 478        self.channel_index
 479            .by_id()
 480            .get(&channel_id)
 481            .is_some_and(|channel| channel.visibility == ChannelVisibility::Public)
 482    }
 483
 484    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 485        match self.channel_role(channel_id) {
 486            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 487            _ => Capability::ReadOnly,
 488        }
 489    }
 490
 491    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 492        maybe!({
 493            let mut channel = self.channel_for_id(channel_id)?;
 494            if !channel.is_root_channel() {
 495                channel = self.channel_for_id(channel.root_id())?;
 496            }
 497            let root_channel_state = self.channel_states.get(&channel.id);
 498            root_channel_state?.role
 499        })
 500        .unwrap_or(proto::ChannelRole::Guest)
 501    }
 502
 503    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 504        self.channel_participants
 505            .get(&channel_id)
 506            .map_or(&[], |v| v.as_slice())
 507    }
 508
 509    pub fn create_channel(
 510        &self,
 511        name: &str,
 512        parent_id: Option<ChannelId>,
 513        cx: &mut Context<Self>,
 514    ) -> Task<Result<ChannelId>> {
 515        let client = self.client.clone();
 516        let name = name.trim_start_matches('#').to_owned();
 517        cx.spawn(async move |this, cx| {
 518            let response = client
 519                .request(proto::CreateChannel {
 520                    name,
 521                    parent_id: parent_id.map(|cid| cid.0),
 522                })
 523                .await?;
 524
 525            let channel = response.channel.context("missing channel in response")?;
 526            let channel_id = ChannelId(channel.id);
 527
 528            this.update(cx, |this, cx| {
 529                let task = this.update_channels(
 530                    proto::UpdateChannels {
 531                        channels: vec![channel],
 532                        ..Default::default()
 533                    },
 534                    cx,
 535                );
 536                assert!(task.is_none());
 537
 538                // This event is emitted because the collab panel wants to clear the pending edit state
 539                // before this frame is rendered. But we can't guarantee that the collab panel's future
 540                // will resolve before this flush_effects finishes. Synchronously emitting this event
 541                // ensures that the collab panel will observe this creation before the frame completes
 542                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 543            })?;
 544
 545            Ok(channel_id)
 546        })
 547    }
 548
 549    pub fn move_channel(
 550        &mut self,
 551        channel_id: ChannelId,
 552        to: ChannelId,
 553        cx: &mut Context<Self>,
 554    ) -> Task<Result<()>> {
 555        let client = self.client.clone();
 556        cx.spawn(async move |_, _| {
 557            let _ = client
 558                .request(proto::MoveChannel {
 559                    channel_id: channel_id.0,
 560                    to: to.0,
 561                })
 562                .await?;
 563            Ok(())
 564        })
 565    }
 566
 567    pub fn reorder_channel(
 568        &mut self,
 569        channel_id: ChannelId,
 570        direction: proto::reorder_channel::Direction,
 571        cx: &mut Context<Self>,
 572    ) -> Task<Result<()>> {
 573        let client = self.client.clone();
 574        cx.spawn(async move |_, _| {
 575            client
 576                .request(proto::ReorderChannel {
 577                    channel_id: channel_id.0,
 578                    direction: direction.into(),
 579                })
 580                .await?;
 581            Ok(())
 582        })
 583    }
 584
 585    pub fn set_channel_visibility(
 586        &mut self,
 587        channel_id: ChannelId,
 588        visibility: ChannelVisibility,
 589        cx: &mut Context<Self>,
 590    ) -> Task<Result<()>> {
 591        let client = self.client.clone();
 592        cx.spawn(async move |_, _| {
 593            let _ = client
 594                .request(proto::SetChannelVisibility {
 595                    channel_id: channel_id.0,
 596                    visibility: visibility.into(),
 597                })
 598                .await?;
 599
 600            Ok(())
 601        })
 602    }
 603
 604    pub fn invite_member(
 605        &mut self,
 606        channel_id: ChannelId,
 607        user_id: UserId,
 608        role: proto::ChannelRole,
 609        cx: &mut Context<Self>,
 610    ) -> Task<Result<()>> {
 611        if !self.outgoing_invites.insert((channel_id, user_id)) {
 612            return Task::ready(Err(anyhow!("invite request already in progress")));
 613        }
 614
 615        cx.notify();
 616        let client = self.client.clone();
 617        cx.spawn(async move |this, cx| {
 618            let result = client
 619                .request(proto::InviteChannelMember {
 620                    channel_id: channel_id.0,
 621                    user_id,
 622                    role: role.into(),
 623                })
 624                .await;
 625
 626            this.update(cx, |this, cx| {
 627                this.outgoing_invites.remove(&(channel_id, user_id));
 628                cx.notify();
 629            })?;
 630
 631            result?;
 632
 633            Ok(())
 634        })
 635    }
 636
 637    pub fn remove_member(
 638        &mut self,
 639        channel_id: ChannelId,
 640        user_id: u64,
 641        cx: &mut Context<Self>,
 642    ) -> Task<Result<()>> {
 643        if !self.outgoing_invites.insert((channel_id, user_id)) {
 644            return Task::ready(Err(anyhow!("invite request already in progress")));
 645        }
 646
 647        cx.notify();
 648        let client = self.client.clone();
 649        cx.spawn(async move |this, cx| {
 650            let result = client
 651                .request(proto::RemoveChannelMember {
 652                    channel_id: channel_id.0,
 653                    user_id,
 654                })
 655                .await;
 656
 657            this.update(cx, |this, cx| {
 658                this.outgoing_invites.remove(&(channel_id, user_id));
 659                cx.notify();
 660            })?;
 661            result?;
 662            Ok(())
 663        })
 664    }
 665
 666    pub fn set_member_role(
 667        &mut self,
 668        channel_id: ChannelId,
 669        user_id: UserId,
 670        role: proto::ChannelRole,
 671        cx: &mut Context<Self>,
 672    ) -> Task<Result<()>> {
 673        if !self.outgoing_invites.insert((channel_id, user_id)) {
 674            return Task::ready(Err(anyhow!("member request already in progress")));
 675        }
 676
 677        cx.notify();
 678        let client = self.client.clone();
 679        cx.spawn(async move |this, cx| {
 680            let result = client
 681                .request(proto::SetChannelMemberRole {
 682                    channel_id: channel_id.0,
 683                    user_id,
 684                    role: role.into(),
 685                })
 686                .await;
 687
 688            this.update(cx, |this, cx| {
 689                this.outgoing_invites.remove(&(channel_id, user_id));
 690                cx.notify();
 691            })?;
 692
 693            result?;
 694            Ok(())
 695        })
 696    }
 697
 698    pub fn rename(
 699        &mut self,
 700        channel_id: ChannelId,
 701        new_name: &str,
 702        cx: &mut Context<Self>,
 703    ) -> Task<Result<()>> {
 704        let client = self.client.clone();
 705        let name = new_name.to_string();
 706        cx.spawn(async move |this, cx| {
 707            let channel = client
 708                .request(proto::RenameChannel {
 709                    channel_id: channel_id.0,
 710                    name,
 711                })
 712                .await?
 713                .channel
 714                .context("missing channel in response")?;
 715            this.update(cx, |this, cx| {
 716                let task = this.update_channels(
 717                    proto::UpdateChannels {
 718                        channels: vec![channel],
 719                        ..Default::default()
 720                    },
 721                    cx,
 722                );
 723                assert!(task.is_none());
 724
 725                // This event is emitted because the collab panel wants to clear the pending edit state
 726                // before this frame is rendered. But we can't guarantee that the collab panel's future
 727                // will resolve before this flush_effects finishes. Synchronously emitting this event
 728                // ensures that the collab panel will observe this creation before the frame complete
 729                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 730            })?;
 731            Ok(())
 732        })
 733    }
 734
 735    pub fn respond_to_channel_invite(
 736        &mut self,
 737        channel_id: ChannelId,
 738        accept: bool,
 739        cx: &mut Context<Self>,
 740    ) -> Task<Result<()>> {
 741        let client = self.client.clone();
 742        cx.background_spawn(async move {
 743            client
 744                .request(proto::RespondToChannelInvite {
 745                    channel_id: channel_id.0,
 746                    accept,
 747                })
 748                .await?;
 749            Ok(())
 750        })
 751    }
 752    pub fn fuzzy_search_members(
 753        &self,
 754        channel_id: ChannelId,
 755        query: String,
 756        limit: u16,
 757        cx: &mut Context<Self>,
 758    ) -> Task<Result<Vec<ChannelMembership>>> {
 759        let client = self.client.clone();
 760        let user_store = self.user_store.downgrade();
 761        cx.spawn(async move |_, cx| {
 762            let response = client
 763                .request(proto::GetChannelMembers {
 764                    channel_id: channel_id.0,
 765                    query,
 766                    limit: limit as u64,
 767                })
 768                .await?;
 769            user_store.update(cx, |user_store, _| {
 770                user_store.insert(response.users);
 771                response
 772                    .members
 773                    .into_iter()
 774                    .filter_map(|member| {
 775                        Some(ChannelMembership {
 776                            user: user_store.get_cached_user(member.user_id)?,
 777                            role: member.role(),
 778                            kind: member.kind(),
 779                        })
 780                    })
 781                    .collect()
 782            })
 783        })
 784    }
 785
 786    pub fn remove_channel(
 787        &self,
 788        channel_id: ChannelId,
 789    ) -> impl Future<Output = Result<()>> + use<> {
 790        let client = self.client.clone();
 791        async move {
 792            client
 793                .request(proto::DeleteChannel {
 794                    channel_id: channel_id.0,
 795                })
 796                .await?;
 797            Ok(())
 798        }
 799    }
 800
 801    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 802        false
 803    }
 804
 805    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 806        self.outgoing_invites.contains(&(channel_id, user_id))
 807    }
 808
 809    async fn handle_update_channels(
 810        this: Entity<Self>,
 811        message: TypedEnvelope<proto::UpdateChannels>,
 812        cx: AsyncApp,
 813    ) -> Result<()> {
 814        this.read_with(&cx, |this, _| {
 815            this.update_channels_tx
 816                .unbounded_send(message.payload)
 817                .unwrap();
 818        });
 819        Ok(())
 820    }
 821
 822    async fn handle_update_user_channels(
 823        this: Entity<Self>,
 824        message: TypedEnvelope<proto::UpdateUserChannels>,
 825        mut cx: AsyncApp,
 826    ) -> Result<()> {
 827        this.update(&mut cx, |this, cx| {
 828            for buffer_version in message.payload.observed_channel_buffer_version {
 829                let version = language::proto::deserialize_version(&buffer_version.version);
 830                this.acknowledge_notes_version(
 831                    ChannelId(buffer_version.channel_id),
 832                    buffer_version.epoch,
 833                    &version,
 834                    cx,
 835                );
 836            }
 837            for membership in message.payload.channel_memberships {
 838                if let Some(role) = ChannelRole::from_i32(membership.role) {
 839                    this.channel_states
 840                        .entry(ChannelId(membership.channel_id))
 841                        .or_default()
 842                        .set_role(role)
 843                }
 844            }
 845        });
 846        Ok(())
 847    }
 848
 849    fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 850        self.channel_index.clear();
 851        self.channel_invitations.clear();
 852        self.channel_participants.clear();
 853        self.channel_index.clear();
 854        self.outgoing_invites.clear();
 855        self.disconnect_channel_buffers_task.take();
 856
 857        let mut buffer_versions = Vec::new();
 858        for buffer in self.opened_buffers.values() {
 859            if let OpenEntityHandle::Open(buffer) = buffer
 860                && let Some(buffer) = buffer.upgrade()
 861            {
 862                buffer.update(cx, |channel_buffer, cx| {
 863                    // Block on_buffer_update from sending UpdateChannelBuffer messages
 864                    // until the rejoin completes. This prevents a race condition where
 865                    // edits made during the rejoin async gap could inflate the server
 866                    // version, causing offline edits to be filtered out by serialize_ops.
 867                    channel_buffer.set_rejoining(true);
 868                    let inner_buffer = channel_buffer.buffer().read(cx);
 869                    buffer_versions.push(proto::ChannelBufferVersion {
 870                        channel_id: channel_buffer.channel_id.0,
 871                        epoch: channel_buffer.epoch(),
 872                        version: language::proto::serialize_version(&inner_buffer.version()),
 873                    });
 874                });
 875            }
 876        }
 877
 878        if buffer_versions.is_empty() {
 879            return Task::ready(Ok(()));
 880        }
 881
 882        let response = self.client.request(proto::RejoinChannelBuffers {
 883            buffers: buffer_versions,
 884        });
 885
 886        cx.spawn(async move |this, cx| {
 887            let response = match response.await {
 888                Ok(response) => response,
 889                Err(err) => {
 890                    // Clear rejoining flag on all buffers since the rejoin failed
 891                    this.update(cx, |this, cx| {
 892                        for buffer in this.opened_buffers.values() {
 893                            if let OpenEntityHandle::Open(buffer) = buffer {
 894                                if let Some(buffer) = buffer.upgrade() {
 895                                    buffer.update(cx, |channel_buffer, _| {
 896                                        channel_buffer.set_rejoining(false);
 897                                    });
 898                                }
 899                            }
 900                        }
 901                    })
 902                    .ok();
 903                    return Err(err);
 904                }
 905            };
 906            let mut response = response;
 907
 908            this.update(cx, |this, cx| {
 909                this.opened_buffers.retain(|_, buffer| match buffer {
 910                    OpenEntityHandle::Open(channel_buffer) => {
 911                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 912                            return false;
 913                        };
 914
 915                        channel_buffer.update(cx, |channel_buffer, cx| {
 916                            let channel_id = channel_buffer.channel_id;
 917                            if let Some(remote_buffer) = response
 918                                .buffers
 919                                .iter_mut()
 920                                .find(|buffer| buffer.channel_id == channel_id.0)
 921                            {
 922                                let channel_id = channel_buffer.channel_id;
 923                                let remote_version =
 924                                    language::proto::deserialize_version(&remote_buffer.version);
 925
 926                                channel_buffer.replace_collaborators(
 927                                    mem::take(&mut remote_buffer.collaborators),
 928                                    cx,
 929                                );
 930
 931                                let operations = channel_buffer
 932                                    .buffer()
 933                                    .update(cx, |buffer, cx| {
 934                                        let outgoing_operations =
 935                                            buffer.serialize_ops(Some(remote_version), cx);
 936                                        let incoming_operations =
 937                                            mem::take(&mut remote_buffer.operations)
 938                                                .into_iter()
 939                                                .map(language::proto::deserialize_operation)
 940                                                .collect::<Result<Vec<_>>>()?;
 941                                        buffer.apply_ops(incoming_operations, cx);
 942                                        anyhow::Ok(outgoing_operations)
 943                                    })
 944                                    .log_err();
 945
 946                                if let Some(operations) = operations {
 947                                    channel_buffer.connected(cx);
 948                                    let client = this.client.clone();
 949                                    cx.background_spawn(async move {
 950                                        let operations = operations.await;
 951                                        for chunk in language::proto::split_operations(operations) {
 952                                            client
 953                                                .send(proto::UpdateChannelBuffer {
 954                                                    channel_id: channel_id.0,
 955                                                    operations: chunk,
 956                                                })
 957                                                .ok();
 958                                        }
 959                                    })
 960                                    .detach();
 961                                    return true;
 962                                }
 963                            }
 964
 965                            channel_buffer.disconnect(cx);
 966                            false
 967                        })
 968                    }
 969                    OpenEntityHandle::Loading(_) => true,
 970                });
 971            })
 972            .ok();
 973            anyhow::Ok(())
 974        })
 975    }
 976
 977    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
 978        cx.notify();
 979        self.did_subscribe = false;
 980
 981        // If we're waiting for reconnect, set rejoining=true on all buffers immediately.
 982        // This prevents operations from being sent during the reconnection window,
 983        // before handle_connect has a chance to run and capture the version.
 984        if wait_for_reconnect {
 985            for buffer in self.opened_buffers.values() {
 986                if let OpenEntityHandle::Open(buffer) = buffer {
 987                    if let Some(buffer) = buffer.upgrade() {
 988                        buffer.update(cx, |channel_buffer, _| {
 989                            channel_buffer.set_rejoining(true);
 990                        });
 991                    }
 992                }
 993            }
 994        }
 995
 996        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 997            cx.spawn(async move |this, cx| {
 998                if wait_for_reconnect {
 999                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1000                }
1001
1002                if let Some(this) = this.upgrade() {
1003                    this.update(cx, |this, cx| {
1004                        for buffer in this.opened_buffers.values() {
1005                            if let OpenEntityHandle::Open(buffer) = &buffer
1006                                && let Some(buffer) = buffer.upgrade()
1007                            {
1008                                buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1009                            }
1010                        }
1011                    });
1012                }
1013            })
1014        });
1015    }
1016
1017    #[cfg(any(test, feature = "test-support"))]
1018    pub fn reset(&mut self) {
1019        self.channel_invitations.clear();
1020        self.channel_index.clear();
1021        self.channel_participants.clear();
1022        self.outgoing_invites.clear();
1023        self.opened_buffers.clear();
1024        self.disconnect_channel_buffers_task = None;
1025        self.channel_states.clear();
1026    }
1027
1028    pub(crate) fn update_channels(
1029        &mut self,
1030        payload: proto::UpdateChannels,
1031        cx: &mut Context<ChannelStore>,
1032    ) -> Option<Task<Result<()>>> {
1033        if !payload.remove_channel_invitations.is_empty() {
1034            self.channel_invitations
1035                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1036        }
1037        for channel in payload.channel_invitations {
1038            match self
1039                .channel_invitations
1040                .binary_search_by_key(&channel.id, |c| c.id.0)
1041            {
1042                Ok(ix) => {
1043                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1044                }
1045                Err(ix) => self.channel_invitations.insert(
1046                    ix,
1047                    Arc::new(Channel {
1048                        id: ChannelId(channel.id),
1049                        visibility: channel.visibility(),
1050                        name: channel.name.into(),
1051                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1052                        channel_order: channel.channel_order,
1053                    }),
1054                ),
1055            }
1056        }
1057
1058        let channels_changed = !payload.channels.is_empty()
1059            || !payload.delete_channels.is_empty()
1060            || !payload.latest_channel_buffer_versions.is_empty();
1061
1062        if channels_changed {
1063            if !payload.delete_channels.is_empty() {
1064                let delete_channels: Vec<ChannelId> =
1065                    payload.delete_channels.into_iter().map(ChannelId).collect();
1066                self.channel_index.delete_channels(&delete_channels);
1067                self.channel_participants
1068                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1069
1070                for channel_id in &delete_channels {
1071                    let channel_id = *channel_id;
1072                    if payload
1073                        .channels
1074                        .iter()
1075                        .any(|channel| channel.id == channel_id.0)
1076                    {
1077                        continue;
1078                    }
1079                    if let Some(OpenEntityHandle::Open(buffer)) =
1080                        self.opened_buffers.remove(&channel_id)
1081                        && let Some(buffer) = buffer.upgrade()
1082                    {
1083                        buffer.update(cx, ChannelBuffer::disconnect);
1084                    }
1085                }
1086            }
1087
1088            let mut index = self.channel_index.bulk_insert();
1089            for channel in payload.channels {
1090                let id = ChannelId(channel.id);
1091                let channel_changed = index.insert(channel);
1092
1093                if channel_changed
1094                    && let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id)
1095                    && let Some(buffer) = buffer.upgrade()
1096                {
1097                    buffer.update(cx, ChannelBuffer::channel_changed);
1098                }
1099            }
1100
1101            for latest_buffer_version in payload.latest_channel_buffer_versions {
1102                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1103                self.channel_states
1104                    .entry(ChannelId(latest_buffer_version.channel_id))
1105                    .or_default()
1106                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1107            }
1108
1109            self.channels_loaded.0.try_send(true).log_err();
1110        }
1111
1112        cx.notify();
1113        if payload.channel_participants.is_empty() {
1114            return None;
1115        }
1116
1117        let mut all_user_ids = Vec::new();
1118        let channel_participants = payload.channel_participants;
1119        for entry in &channel_participants {
1120            for user_id in entry.participant_user_ids.iter() {
1121                if let Err(ix) = all_user_ids.binary_search(user_id) {
1122                    all_user_ids.insert(ix, *user_id);
1123                }
1124            }
1125        }
1126
1127        let users = self
1128            .user_store
1129            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1130        Some(cx.spawn(async move |this, cx| {
1131            let users = users.await?;
1132
1133            this.update(cx, |this, cx| {
1134                for entry in &channel_participants {
1135                    let mut participants: Vec<_> = entry
1136                        .participant_user_ids
1137                        .iter()
1138                        .filter_map(|user_id| {
1139                            users
1140                                .binary_search_by_key(&user_id, |user| &user.id)
1141                                .ok()
1142                                .map(|ix| users[ix].clone())
1143                        })
1144                        .collect();
1145
1146                    participants.sort_by_key(|u| u.id);
1147
1148                    this.channel_participants
1149                        .insert(ChannelId(entry.channel_id), participants);
1150                }
1151
1152                cx.notify();
1153            })
1154        }))
1155    }
1156}
1157
1158impl ChannelState {
1159    fn set_role(&mut self, role: ChannelRole) {
1160        self.role = Some(role);
1161    }
1162
1163    fn has_channel_buffer_changed(&self) -> bool {
1164        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1165            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1166                && self
1167                    .latest_notes_version
1168                    .version
1169                    .changed_since(&self.observed_notes_version.version))
1170    }
1171
1172    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1173        if self.observed_notes_version.epoch == epoch {
1174            self.observed_notes_version.version.join(version);
1175        } else {
1176            self.observed_notes_version = NotesVersion {
1177                epoch,
1178                version: version.clone(),
1179            };
1180        }
1181    }
1182
1183    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1184        if self.latest_notes_version.epoch == epoch {
1185            self.latest_notes_version.version.join(version);
1186        } else {
1187            self.latest_notes_version = NotesVersion {
1188                epoch,
1189                version: version.clone(),
1190            };
1191        }
1192    }
1193}