channel_store.rs

   1mod channel_index;
   2
   3use crate::channel_buffer::ChannelBuffer;
   4use anyhow::{Context as _, Result, anyhow};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
   7use collections::{HashMap, HashSet};
   8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
   9use gpui::{
  10    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
  11    WeakEntity,
  12};
  13use language::Capability;
  14use postage::{sink::Sink, watch};
  15use rpc::{
  16    TypedEnvelope,
  17    proto::{self, ChannelRole, ChannelVisibility},
  18};
  19use settings::Settings;
  20use std::{mem, sync::Arc, time::Duration};
  21use util::{ResultExt, maybe};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
  26    let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36pub struct ChannelStore {
  37    pub channel_index: ChannelIndex,
  38    channel_invitations: Vec<Arc<Channel>>,
  39    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  40    channel_states: HashMap<ChannelId, ChannelState>,
  41    favorite_channel_ids: Vec<ChannelId>,
  42    outgoing_invites: HashSet<(ChannelId, UserId)>,
  43    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  44    opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
  45    client: Arc<Client>,
  46    did_subscribe: bool,
  47    channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
  48    user_store: Entity<UserStore>,
  49    _rpc_subscriptions: [Subscription; 2],
  50    _watch_connection_status: Task<Option<()>>,
  51    disconnect_channel_buffers_task: Option<Task<()>>,
  52    _update_channels: Task<()>,
  53}
  54
  55#[derive(Clone, Debug)]
  56pub struct Channel {
  57    pub id: ChannelId,
  58    pub name: SharedString,
  59    pub visibility: proto::ChannelVisibility,
  60    pub parent_path: Vec<ChannelId>,
  61    pub channel_order: i32,
  62}
  63
  64#[derive(Default, Debug)]
  65pub struct ChannelState {
  66    latest_notes_version: NotesVersion,
  67    observed_notes_version: NotesVersion,
  68    role: Option<ChannelRole>,
  69}
  70
  71impl Channel {
  72    pub fn link(&self, cx: &App) -> String {
  73        format!(
  74            "{}/channel/{}-{}",
  75            ClientSettings::get_global(cx).server_url,
  76            Self::slug(&self.name),
  77            self.id
  78        )
  79    }
  80
  81    pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
  82        self.link(cx)
  83            + "/notes"
  84            + &heading
  85                .map(|h| format!("#{}", Self::slug(&h)))
  86                .unwrap_or_default()
  87    }
  88
  89    pub fn is_root_channel(&self) -> bool {
  90        self.parent_path.is_empty()
  91    }
  92
  93    pub fn root_id(&self) -> ChannelId {
  94        self.parent_path.first().copied().unwrap_or(self.id)
  95    }
  96
  97    pub fn slug(str: &str) -> String {
  98        let slug: String = str
  99            .chars()
 100            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 101            .collect();
 102
 103        slug.trim_matches(|c| c == '-').to_string()
 104    }
 105}
 106
 107#[derive(Debug)]
 108pub struct ChannelMembership {
 109    pub user: Arc<User>,
 110    pub kind: proto::channel_member::Kind,
 111    pub role: proto::ChannelRole,
 112}
 113impl ChannelMembership {
 114    pub fn sort_key(&self) -> MembershipSortKey<'_> {
 115        MembershipSortKey {
 116            role_order: match self.role {
 117                proto::ChannelRole::Admin => 0,
 118                proto::ChannelRole::Member => 1,
 119                proto::ChannelRole::Banned => 2,
 120                proto::ChannelRole::Talker => 3,
 121                proto::ChannelRole::Guest => 4,
 122            },
 123            kind_order: match self.kind {
 124                proto::channel_member::Kind::Member => 0,
 125                proto::channel_member::Kind::Invitee => 1,
 126            },
 127            username_order: &self.user.github_login,
 128        }
 129    }
 130}
 131
 132#[derive(PartialOrd, Ord, PartialEq, Eq)]
 133pub struct MembershipSortKey<'a> {
 134    role_order: u8,
 135    kind_order: u8,
 136    username_order: &'a str,
 137}
 138
 139pub enum ChannelEvent {
 140    ChannelCreated(ChannelId),
 141    ChannelRenamed(ChannelId),
 142}
 143
 144impl EventEmitter<ChannelEvent> for ChannelStore {}
 145
 146enum OpenEntityHandle<E> {
 147    Open(WeakEntity<E>),
 148    Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
 149}
 150
 151struct GlobalChannelStore(Entity<ChannelStore>);
 152
 153impl Global for GlobalChannelStore {}
 154
 155impl ChannelStore {
 156    pub fn global(cx: &App) -> Entity<Self> {
 157        cx.global::<GlobalChannelStore>().0.clone()
 158    }
 159
 160    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 161        cx.try_global::<GlobalChannelStore>().map(|g| g.0.clone())
 162    }
 163
 164    pub fn favorite_channel_ids(&self) -> &[ChannelId] {
 165        &self.favorite_channel_ids
 166    }
 167
 168    pub fn is_channel_favorited(&self, channel_id: ChannelId) -> bool {
 169        self.favorite_channel_ids.contains(&channel_id)
 170    }
 171
 172    pub fn toggle_favorite_channel(&mut self, channel_id: ChannelId, cx: &mut Context<Self>) {
 173        if let Some(ix) = self
 174            .favorite_channel_ids
 175            .iter()
 176            .position(|id| *id == channel_id)
 177        {
 178            self.favorite_channel_ids.remove(ix);
 179        } else {
 180            self.favorite_channel_ids.push(channel_id);
 181        }
 182        cx.notify();
 183    }
 184
 185    pub fn set_favorite_channel_ids(&mut self, ids: Vec<ChannelId>, cx: &mut Context<Self>) {
 186        self.favorite_channel_ids = ids;
 187        cx.notify();
 188    }
 189
 190    pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 191        let rpc_subscriptions = [
 192            client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
 193            client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
 194        ];
 195
 196        let mut connection_status = client.status();
 197        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 198        let watch_connection_status = cx.spawn(async move |this, cx| {
 199            while let Some(status) = connection_status.next().await {
 200                let this = this.upgrade()?;
 201                match status {
 202                    client::Status::Connected { .. } => {
 203                        this.update(cx, |this, cx| this.handle_connect(cx))
 204                            .await
 205                            .log_err()?;
 206                    }
 207                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 208                        this.update(cx, |this, cx| this.handle_disconnect(false, cx));
 209                    }
 210                    _ => {
 211                        this.update(cx, |this, cx| this.handle_disconnect(true, cx));
 212                    }
 213                }
 214            }
 215            Some(())
 216        });
 217
 218        Self {
 219            channel_invitations: Vec::default(),
 220            channel_index: ChannelIndex::default(),
 221            channel_participants: Default::default(),
 222            outgoing_invites: Default::default(),
 223            opened_buffers: Default::default(),
 224            update_channels_tx,
 225            client,
 226            user_store,
 227            _rpc_subscriptions: rpc_subscriptions,
 228            _watch_connection_status: watch_connection_status,
 229            disconnect_channel_buffers_task: None,
 230            _update_channels: cx.spawn(async move |this, cx| {
 231                maybe!(async move {
 232                    while let Some(update_channels) = update_channels_rx.next().await {
 233                        if let Some(this) = this.upgrade() {
 234                            let update_task = this
 235                                .update(cx, |this, cx| this.update_channels(update_channels, cx));
 236                            if let Some(update_task) = update_task {
 237                                update_task.await.log_err();
 238                            }
 239                        }
 240                    }
 241                    anyhow::Ok(())
 242                })
 243                .await
 244                .log_err();
 245            }),
 246            channel_states: Default::default(),
 247            favorite_channel_ids: Vec::default(),
 248            did_subscribe: false,
 249            channels_loaded: watch::channel_with(false),
 250        }
 251    }
 252
 253    pub fn initialize(&mut self) {
 254        if !self.did_subscribe
 255            && self
 256                .client
 257                .send(proto::SubscribeToChannels {})
 258                .log_err()
 259                .is_some()
 260        {
 261            self.did_subscribe = true;
 262        }
 263    }
 264
 265    pub fn wait_for_channels(
 266        &mut self,
 267        timeout: Duration,
 268        cx: &mut Context<Self>,
 269    ) -> Task<Result<()>> {
 270        let mut channels_loaded_rx = self.channels_loaded.1.clone();
 271        if *channels_loaded_rx.borrow() {
 272            return Task::ready(Ok(()));
 273        }
 274
 275        let mut status_receiver = self.client.status();
 276        if status_receiver.borrow().is_connected() {
 277            self.initialize();
 278        }
 279
 280        let mut timer = cx.background_executor().timer(timeout).fuse();
 281        cx.spawn(async move |this, cx| {
 282            loop {
 283                futures::select_biased! {
 284                    channels_loaded = channels_loaded_rx.next().fuse() => {
 285                        if let Some(true) = channels_loaded {
 286                            return Ok(());
 287                        }
 288                    }
 289                    status = status_receiver.next().fuse() => {
 290                        if let Some(status) = status
 291                            && status.is_connected() {
 292                                this.update(cx, |this, _cx| {
 293                                    this.initialize();
 294                                }).ok();
 295                            }
 296                        continue;
 297                    }
 298                    _ = timer => {
 299                        return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
 300                    }
 301                }
 302            }
 303        })
 304    }
 305
 306    pub fn client(&self) -> Arc<Client> {
 307        self.client.clone()
 308    }
 309
 310    /// Returns the number of unique channels in the store
 311    pub fn channel_count(&self) -> usize {
 312        self.channel_index.by_id().len()
 313    }
 314
 315    /// Returns the index of a channel ID in the list of unique channels
 316    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 317        self.channel_index
 318            .by_id()
 319            .keys()
 320            .position(|id| *id == channel_id)
 321    }
 322
 323    /// Returns an iterator over all unique channels
 324    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 325        self.channel_index.by_id().values()
 326    }
 327
 328    /// Iterate over all entries in the channel DAG
 329    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 330        self.channel_index
 331            .ordered_channels()
 332            .iter()
 333            .filter_map(move |id| {
 334                let channel = self.channel_index.by_id().get(id)?;
 335                Some((channel.parent_path.len(), channel))
 336            })
 337    }
 338
 339    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 340        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 341        self.channel_index.by_id().get(channel_id)
 342    }
 343
 344    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 345        self.channel_index.by_id().values().nth(ix)
 346    }
 347
 348    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 349        self.channel_invitations
 350            .iter()
 351            .any(|channel| channel.id == channel_id)
 352    }
 353
 354    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 355        &self.channel_invitations
 356    }
 357
 358    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 359        self.channel_index.by_id().get(&channel_id)
 360    }
 361
 362    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
 363        if let Some(buffer) = self.opened_buffers.get(&channel_id)
 364            && let OpenEntityHandle::Open(buffer) = buffer
 365        {
 366            return buffer.upgrade().is_some();
 367        }
 368        false
 369    }
 370
 371    pub fn open_channel_buffer(
 372        &mut self,
 373        channel_id: ChannelId,
 374        cx: &mut Context<Self>,
 375    ) -> Task<Result<Entity<ChannelBuffer>>> {
 376        let client = self.client.clone();
 377        let user_store = self.user_store.clone();
 378        let channel_store = cx.entity();
 379        self.open_channel_resource(
 380            channel_id,
 381            "notes",
 382            |this| &mut this.opened_buffers,
 383            async move |channel, cx| {
 384                ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
 385            },
 386            cx,
 387        )
 388    }
 389
 390    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 391        self.channel_states
 392            .get(&channel_id)
 393            .is_some_and(|state| state.has_channel_buffer_changed())
 394    }
 395
 396    pub fn acknowledge_notes_version(
 397        &mut self,
 398        channel_id: ChannelId,
 399        epoch: u64,
 400        version: &clock::Global,
 401        cx: &mut Context<Self>,
 402    ) {
 403        self.channel_states
 404            .entry(channel_id)
 405            .or_default()
 406            .acknowledge_notes_version(epoch, version);
 407        cx.notify()
 408    }
 409
 410    pub fn update_latest_notes_version(
 411        &mut self,
 412        channel_id: ChannelId,
 413        epoch: u64,
 414        version: &clock::Global,
 415        cx: &mut Context<Self>,
 416    ) {
 417        self.channel_states
 418            .entry(channel_id)
 419            .or_default()
 420            .update_latest_notes_version(epoch, version);
 421        cx.notify()
 422    }
 423
 424    /// Asynchronously open a given resource associated with a channel.
 425    ///
 426    /// Make sure that the resource is only opened once, even if this method
 427    /// is called multiple times with the same channel id while the first task
 428    /// is still running.
 429    fn open_channel_resource<T, F>(
 430        &mut self,
 431        channel_id: ChannelId,
 432        resource_name: &'static str,
 433        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
 434        load: F,
 435        cx: &mut Context<Self>,
 436    ) -> Task<Result<Entity<T>>>
 437    where
 438        F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
 439        T: 'static,
 440    {
 441        let task = loop {
 442            match get_map(self).get(&channel_id) {
 443                Some(OpenEntityHandle::Open(entity)) => {
 444                    if let Some(entity) = entity.upgrade() {
 445                        break Task::ready(Ok(entity)).shared();
 446                    } else {
 447                        get_map(self).remove(&channel_id);
 448                        continue;
 449                    }
 450                }
 451                Some(OpenEntityHandle::Loading(task)) => break task.clone(),
 452                None => {}
 453            }
 454
 455            let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
 456            let task = cx
 457                .spawn(async move |this, cx| {
 458                    channels_ready.await?;
 459                    let channel = this.read_with(cx, |this, _| {
 460                        this.channel_for_id(channel_id)
 461                            .cloned()
 462                            .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
 463                    })??;
 464
 465                    load(channel, cx).await.map_err(Arc::new)
 466                })
 467                .shared();
 468
 469            get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
 470            let task = cx.spawn({
 471                async move |this, cx| {
 472                    let result = task.await;
 473                    this.update(cx, |this, _| match &result {
 474                        Ok(model) => {
 475                            get_map(this)
 476                                .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
 477                        }
 478                        Err(_) => {
 479                            get_map(this).remove(&channel_id);
 480                        }
 481                    })?;
 482                    result
 483                }
 484            });
 485            break task.shared();
 486        };
 487        cx.background_spawn(async move {
 488            task.await.map_err(|error| {
 489                anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
 490            })
 491        })
 492    }
 493
 494    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 495        self.channel_role(channel_id) == proto::ChannelRole::Admin
 496    }
 497
 498    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 499        self.channel_index
 500            .by_id()
 501            .get(&channel_id)
 502            .is_some_and(|channel| channel.is_root_channel())
 503    }
 504
 505    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 506        self.channel_index
 507            .by_id()
 508            .get(&channel_id)
 509            .is_some_and(|channel| channel.visibility == ChannelVisibility::Public)
 510    }
 511
 512    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 513        match self.channel_role(channel_id) {
 514            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 515            _ => Capability::ReadOnly,
 516        }
 517    }
 518
 519    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 520        maybe!({
 521            let mut channel = self.channel_for_id(channel_id)?;
 522            if !channel.is_root_channel() {
 523                channel = self.channel_for_id(channel.root_id())?;
 524            }
 525            let root_channel_state = self.channel_states.get(&channel.id);
 526            root_channel_state?.role
 527        })
 528        .unwrap_or(proto::ChannelRole::Guest)
 529    }
 530
 531    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 532        self.channel_participants
 533            .get(&channel_id)
 534            .map_or(&[], |v| v.as_slice())
 535    }
 536
 537    pub fn create_channel(
 538        &self,
 539        name: &str,
 540        parent_id: Option<ChannelId>,
 541        cx: &mut Context<Self>,
 542    ) -> Task<Result<ChannelId>> {
 543        let client = self.client.clone();
 544        let name = name.trim_start_matches('#').to_owned();
 545        cx.spawn(async move |this, cx| {
 546            let response = client
 547                .request(proto::CreateChannel {
 548                    name,
 549                    parent_id: parent_id.map(|cid| cid.0),
 550                })
 551                .await?;
 552
 553            let channel = response.channel.context("missing channel in response")?;
 554            let channel_id = ChannelId(channel.id);
 555
 556            this.update(cx, |this, cx| {
 557                let task = this.update_channels(
 558                    proto::UpdateChannels {
 559                        channels: vec![channel],
 560                        ..Default::default()
 561                    },
 562                    cx,
 563                );
 564                assert!(task.is_none());
 565
 566                // This event is emitted because the collab panel wants to clear the pending edit state
 567                // before this frame is rendered. But we can't guarantee that the collab panel's future
 568                // will resolve before this flush_effects finishes. Synchronously emitting this event
 569                // ensures that the collab panel will observe this creation before the frame completes
 570                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 571            })?;
 572
 573            Ok(channel_id)
 574        })
 575    }
 576
 577    pub fn move_channel(
 578        &mut self,
 579        channel_id: ChannelId,
 580        to: ChannelId,
 581        cx: &mut Context<Self>,
 582    ) -> Task<Result<()>> {
 583        let client = self.client.clone();
 584        cx.spawn(async move |_, _| {
 585            let _ = client
 586                .request(proto::MoveChannel {
 587                    channel_id: channel_id.0,
 588                    to: to.0,
 589                })
 590                .await?;
 591            Ok(())
 592        })
 593    }
 594
 595    pub fn reorder_channel(
 596        &mut self,
 597        channel_id: ChannelId,
 598        direction: proto::reorder_channel::Direction,
 599        cx: &mut Context<Self>,
 600    ) -> Task<Result<()>> {
 601        let client = self.client.clone();
 602        cx.spawn(async move |_, _| {
 603            client
 604                .request(proto::ReorderChannel {
 605                    channel_id: channel_id.0,
 606                    direction: direction.into(),
 607                })
 608                .await?;
 609            Ok(())
 610        })
 611    }
 612
 613    pub fn set_channel_visibility(
 614        &mut self,
 615        channel_id: ChannelId,
 616        visibility: ChannelVisibility,
 617        cx: &mut Context<Self>,
 618    ) -> Task<Result<()>> {
 619        let client = self.client.clone();
 620        cx.spawn(async move |_, _| {
 621            let _ = client
 622                .request(proto::SetChannelVisibility {
 623                    channel_id: channel_id.0,
 624                    visibility: visibility.into(),
 625                })
 626                .await?;
 627
 628            Ok(())
 629        })
 630    }
 631
 632    pub fn invite_member(
 633        &mut self,
 634        channel_id: ChannelId,
 635        user_id: UserId,
 636        role: proto::ChannelRole,
 637        cx: &mut Context<Self>,
 638    ) -> Task<Result<()>> {
 639        if !self.outgoing_invites.insert((channel_id, user_id)) {
 640            return Task::ready(Err(anyhow!("invite request already in progress")));
 641        }
 642
 643        cx.notify();
 644        let client = self.client.clone();
 645        cx.spawn(async move |this, cx| {
 646            let result = client
 647                .request(proto::InviteChannelMember {
 648                    channel_id: channel_id.0,
 649                    user_id,
 650                    role: role.into(),
 651                })
 652                .await;
 653
 654            this.update(cx, |this, cx| {
 655                this.outgoing_invites.remove(&(channel_id, user_id));
 656                cx.notify();
 657            })?;
 658
 659            result?;
 660
 661            Ok(())
 662        })
 663    }
 664
 665    pub fn remove_member(
 666        &mut self,
 667        channel_id: ChannelId,
 668        user_id: u64,
 669        cx: &mut Context<Self>,
 670    ) -> Task<Result<()>> {
 671        if !self.outgoing_invites.insert((channel_id, user_id)) {
 672            return Task::ready(Err(anyhow!("invite request already in progress")));
 673        }
 674
 675        cx.notify();
 676        let client = self.client.clone();
 677        cx.spawn(async move |this, cx| {
 678            let result = client
 679                .request(proto::RemoveChannelMember {
 680                    channel_id: channel_id.0,
 681                    user_id,
 682                })
 683                .await;
 684
 685            this.update(cx, |this, cx| {
 686                this.outgoing_invites.remove(&(channel_id, user_id));
 687                cx.notify();
 688            })?;
 689            result?;
 690            Ok(())
 691        })
 692    }
 693
 694    pub fn set_member_role(
 695        &mut self,
 696        channel_id: ChannelId,
 697        user_id: UserId,
 698        role: proto::ChannelRole,
 699        cx: &mut Context<Self>,
 700    ) -> Task<Result<()>> {
 701        if !self.outgoing_invites.insert((channel_id, user_id)) {
 702            return Task::ready(Err(anyhow!("member request already in progress")));
 703        }
 704
 705        cx.notify();
 706        let client = self.client.clone();
 707        cx.spawn(async move |this, cx| {
 708            let result = client
 709                .request(proto::SetChannelMemberRole {
 710                    channel_id: channel_id.0,
 711                    user_id,
 712                    role: role.into(),
 713                })
 714                .await;
 715
 716            this.update(cx, |this, cx| {
 717                this.outgoing_invites.remove(&(channel_id, user_id));
 718                cx.notify();
 719            })?;
 720
 721            result?;
 722            Ok(())
 723        })
 724    }
 725
 726    pub fn rename(
 727        &mut self,
 728        channel_id: ChannelId,
 729        new_name: &str,
 730        cx: &mut Context<Self>,
 731    ) -> Task<Result<()>> {
 732        let client = self.client.clone();
 733        let name = new_name.to_string();
 734        cx.spawn(async move |this, cx| {
 735            let channel = client
 736                .request(proto::RenameChannel {
 737                    channel_id: channel_id.0,
 738                    name,
 739                })
 740                .await?
 741                .channel
 742                .context("missing channel in response")?;
 743            this.update(cx, |this, cx| {
 744                let task = this.update_channels(
 745                    proto::UpdateChannels {
 746                        channels: vec![channel],
 747                        ..Default::default()
 748                    },
 749                    cx,
 750                );
 751                assert!(task.is_none());
 752
 753                // This event is emitted because the collab panel wants to clear the pending edit state
 754                // before this frame is rendered. But we can't guarantee that the collab panel's future
 755                // will resolve before this flush_effects finishes. Synchronously emitting this event
 756                // ensures that the collab panel will observe this creation before the frame complete
 757                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 758            })?;
 759            Ok(())
 760        })
 761    }
 762
 763    pub fn respond_to_channel_invite(
 764        &mut self,
 765        channel_id: ChannelId,
 766        accept: bool,
 767        cx: &mut Context<Self>,
 768    ) -> Task<Result<()>> {
 769        let client = self.client.clone();
 770        cx.background_spawn(async move {
 771            client
 772                .request(proto::RespondToChannelInvite {
 773                    channel_id: channel_id.0,
 774                    accept,
 775                })
 776                .await?;
 777            Ok(())
 778        })
 779    }
 780    pub fn fuzzy_search_members(
 781        &self,
 782        channel_id: ChannelId,
 783        query: String,
 784        limit: u16,
 785        cx: &mut Context<Self>,
 786    ) -> Task<Result<Vec<ChannelMembership>>> {
 787        let client = self.client.clone();
 788        let user_store = self.user_store.downgrade();
 789        cx.spawn(async move |_, cx| {
 790            let response = client
 791                .request(proto::GetChannelMembers {
 792                    channel_id: channel_id.0,
 793                    query,
 794                    limit: limit as u64,
 795                })
 796                .await?;
 797            user_store.update(cx, |user_store, _| {
 798                user_store.insert(response.users);
 799                response
 800                    .members
 801                    .into_iter()
 802                    .filter_map(|member| {
 803                        Some(ChannelMembership {
 804                            user: user_store.get_cached_user(member.user_id)?,
 805                            role: member.role(),
 806                            kind: member.kind(),
 807                        })
 808                    })
 809                    .collect()
 810            })
 811        })
 812    }
 813
 814    pub fn remove_channel(
 815        &self,
 816        channel_id: ChannelId,
 817    ) -> impl Future<Output = Result<()>> + use<> {
 818        let client = self.client.clone();
 819        async move {
 820            client
 821                .request(proto::DeleteChannel {
 822                    channel_id: channel_id.0,
 823                })
 824                .await?;
 825            Ok(())
 826        }
 827    }
 828
 829    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 830        false
 831    }
 832
 833    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 834        self.outgoing_invites.contains(&(channel_id, user_id))
 835    }
 836
 837    async fn handle_update_channels(
 838        this: Entity<Self>,
 839        message: TypedEnvelope<proto::UpdateChannels>,
 840        cx: AsyncApp,
 841    ) -> Result<()> {
 842        this.read_with(&cx, |this, _| {
 843            this.update_channels_tx
 844                .unbounded_send(message.payload)
 845                .unwrap();
 846        });
 847        Ok(())
 848    }
 849
 850    async fn handle_update_user_channels(
 851        this: Entity<Self>,
 852        message: TypedEnvelope<proto::UpdateUserChannels>,
 853        mut cx: AsyncApp,
 854    ) -> Result<()> {
 855        this.update(&mut cx, |this, cx| {
 856            for buffer_version in message.payload.observed_channel_buffer_version {
 857                let version = language::proto::deserialize_version(&buffer_version.version);
 858                this.acknowledge_notes_version(
 859                    ChannelId(buffer_version.channel_id),
 860                    buffer_version.epoch,
 861                    &version,
 862                    cx,
 863                );
 864            }
 865            for membership in message.payload.channel_memberships {
 866                if let Some(role) = ChannelRole::from_i32(membership.role) {
 867                    this.channel_states
 868                        .entry(ChannelId(membership.channel_id))
 869                        .or_default()
 870                        .set_role(role)
 871                }
 872            }
 873        });
 874        Ok(())
 875    }
 876
 877    fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 878        self.channel_index.clear();
 879        self.channel_invitations.clear();
 880        self.channel_participants.clear();
 881        self.channel_index.clear();
 882        self.outgoing_invites.clear();
 883        self.disconnect_channel_buffers_task.take();
 884
 885        let mut buffer_versions = Vec::new();
 886        for buffer in self.opened_buffers.values() {
 887            if let OpenEntityHandle::Open(buffer) = buffer
 888                && let Some(buffer) = buffer.upgrade()
 889            {
 890                buffer.update(cx, |channel_buffer, cx| {
 891                    // Block on_buffer_update from sending UpdateChannelBuffer messages
 892                    // until the rejoin completes. This prevents a race condition where
 893                    // edits made during the rejoin async gap could inflate the server
 894                    // version, causing offline edits to be filtered out by serialize_ops.
 895                    channel_buffer.set_rejoining(true);
 896                    let inner_buffer = channel_buffer.buffer().read(cx);
 897                    buffer_versions.push(proto::ChannelBufferVersion {
 898                        channel_id: channel_buffer.channel_id.0,
 899                        epoch: channel_buffer.epoch(),
 900                        version: language::proto::serialize_version(&inner_buffer.version()),
 901                    });
 902                });
 903            }
 904        }
 905
 906        if buffer_versions.is_empty() {
 907            return Task::ready(Ok(()));
 908        }
 909
 910        let response = self.client.request(proto::RejoinChannelBuffers {
 911            buffers: buffer_versions,
 912        });
 913
 914        cx.spawn(async move |this, cx| {
 915            let response = match response.await {
 916                Ok(response) => response,
 917                Err(err) => {
 918                    // Clear rejoining flag on all buffers since the rejoin failed
 919                    this.update(cx, |this, cx| {
 920                        for buffer in this.opened_buffers.values() {
 921                            if let OpenEntityHandle::Open(buffer) = buffer {
 922                                if let Some(buffer) = buffer.upgrade() {
 923                                    buffer.update(cx, |channel_buffer, _| {
 924                                        channel_buffer.set_rejoining(false);
 925                                    });
 926                                }
 927                            }
 928                        }
 929                    })
 930                    .ok();
 931                    return Err(err);
 932                }
 933            };
 934            let mut response = response;
 935
 936            this.update(cx, |this, cx| {
 937                this.opened_buffers.retain(|_, buffer| match buffer {
 938                    OpenEntityHandle::Open(channel_buffer) => {
 939                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 940                            return false;
 941                        };
 942
 943                        channel_buffer.update(cx, |channel_buffer, cx| {
 944                            let channel_id = channel_buffer.channel_id;
 945                            if let Some(remote_buffer) = response
 946                                .buffers
 947                                .iter_mut()
 948                                .find(|buffer| buffer.channel_id == channel_id.0)
 949                            {
 950                                let channel_id = channel_buffer.channel_id;
 951                                let remote_version =
 952                                    language::proto::deserialize_version(&remote_buffer.version);
 953
 954                                channel_buffer.replace_collaborators(
 955                                    mem::take(&mut remote_buffer.collaborators),
 956                                    cx,
 957                                );
 958
 959                                let operations = channel_buffer
 960                                    .buffer()
 961                                    .update(cx, |buffer, cx| {
 962                                        let outgoing_operations =
 963                                            buffer.serialize_ops(Some(remote_version), cx);
 964                                        let incoming_operations =
 965                                            mem::take(&mut remote_buffer.operations)
 966                                                .into_iter()
 967                                                .map(language::proto::deserialize_operation)
 968                                                .collect::<Result<Vec<_>>>()?;
 969                                        buffer.apply_ops(incoming_operations, cx);
 970                                        anyhow::Ok(outgoing_operations)
 971                                    })
 972                                    .log_err();
 973
 974                                if let Some(operations) = operations {
 975                                    channel_buffer.connected(cx);
 976                                    let client = this.client.clone();
 977                                    cx.background_spawn(async move {
 978                                        let operations = operations.await;
 979                                        for chunk in language::proto::split_operations(operations) {
 980                                            client
 981                                                .send(proto::UpdateChannelBuffer {
 982                                                    channel_id: channel_id.0,
 983                                                    operations: chunk,
 984                                                })
 985                                                .ok();
 986                                        }
 987                                    })
 988                                    .detach();
 989                                    return true;
 990                                }
 991                            }
 992
 993                            channel_buffer.disconnect(cx);
 994                            false
 995                        })
 996                    }
 997                    OpenEntityHandle::Loading(_) => true,
 998                });
 999            })
1000            .ok();
1001            anyhow::Ok(())
1002        })
1003    }
1004
1005    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1006        cx.notify();
1007        self.did_subscribe = false;
1008
1009        // If we're waiting for reconnect, set rejoining=true on all buffers immediately.
1010        // This prevents operations from being sent during the reconnection window,
1011        // before handle_connect has a chance to run and capture the version.
1012        if wait_for_reconnect {
1013            for buffer in self.opened_buffers.values() {
1014                if let OpenEntityHandle::Open(buffer) = buffer {
1015                    if let Some(buffer) = buffer.upgrade() {
1016                        buffer.update(cx, |channel_buffer, _| {
1017                            channel_buffer.set_rejoining(true);
1018                        });
1019                    }
1020                }
1021            }
1022        }
1023
1024        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1025            cx.spawn(async move |this, cx| {
1026                if wait_for_reconnect {
1027                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1028                }
1029
1030                if let Some(this) = this.upgrade() {
1031                    this.update(cx, |this, cx| {
1032                        for buffer in this.opened_buffers.values() {
1033                            if let OpenEntityHandle::Open(buffer) = &buffer
1034                                && let Some(buffer) = buffer.upgrade()
1035                            {
1036                                buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1037                            }
1038                        }
1039                    });
1040                }
1041            })
1042        });
1043    }
1044
1045    #[cfg(any(test, feature = "test-support"))]
1046    pub fn reset(&mut self) {
1047        self.channel_invitations.clear();
1048        self.channel_index.clear();
1049        self.channel_participants.clear();
1050        self.outgoing_invites.clear();
1051        self.opened_buffers.clear();
1052        self.disconnect_channel_buffers_task = None;
1053        self.channel_states.clear();
1054    }
1055
1056    pub(crate) fn update_channels(
1057        &mut self,
1058        payload: proto::UpdateChannels,
1059        cx: &mut Context<ChannelStore>,
1060    ) -> Option<Task<Result<()>>> {
1061        if !payload.remove_channel_invitations.is_empty() {
1062            self.channel_invitations
1063                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1064        }
1065        for channel in payload.channel_invitations {
1066            match self
1067                .channel_invitations
1068                .binary_search_by_key(&channel.id, |c| c.id.0)
1069            {
1070                Ok(ix) => {
1071                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1072                }
1073                Err(ix) => self.channel_invitations.insert(
1074                    ix,
1075                    Arc::new(Channel {
1076                        id: ChannelId(channel.id),
1077                        visibility: channel.visibility(),
1078                        name: channel.name.into(),
1079                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1080                        channel_order: channel.channel_order,
1081                    }),
1082                ),
1083            }
1084        }
1085
1086        let channels_changed = !payload.channels.is_empty()
1087            || !payload.delete_channels.is_empty()
1088            || !payload.latest_channel_buffer_versions.is_empty();
1089
1090        if channels_changed {
1091            if !payload.delete_channels.is_empty() {
1092                let delete_channels: Vec<ChannelId> =
1093                    payload.delete_channels.into_iter().map(ChannelId).collect();
1094                self.channel_index.delete_channels(&delete_channels);
1095                self.channel_participants
1096                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1097                self.favorite_channel_ids
1098                    .retain(|channel_id| !delete_channels.contains(channel_id));
1099
1100                for channel_id in &delete_channels {
1101                    let channel_id = *channel_id;
1102                    if payload
1103                        .channels
1104                        .iter()
1105                        .any(|channel| channel.id == channel_id.0)
1106                    {
1107                        continue;
1108                    }
1109                    if let Some(OpenEntityHandle::Open(buffer)) =
1110                        self.opened_buffers.remove(&channel_id)
1111                        && let Some(buffer) = buffer.upgrade()
1112                    {
1113                        buffer.update(cx, ChannelBuffer::disconnect);
1114                    }
1115                }
1116            }
1117
1118            let mut index = self.channel_index.bulk_insert();
1119            for channel in payload.channels {
1120                let id = ChannelId(channel.id);
1121                let channel_changed = index.insert(channel);
1122
1123                if channel_changed
1124                    && let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id)
1125                    && let Some(buffer) = buffer.upgrade()
1126                {
1127                    buffer.update(cx, ChannelBuffer::channel_changed);
1128                }
1129            }
1130
1131            for latest_buffer_version in payload.latest_channel_buffer_versions {
1132                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1133                self.channel_states
1134                    .entry(ChannelId(latest_buffer_version.channel_id))
1135                    .or_default()
1136                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1137            }
1138
1139            self.channels_loaded.0.try_send(true).log_err();
1140        }
1141
1142        cx.notify();
1143        if payload.channel_participants.is_empty() {
1144            return None;
1145        }
1146
1147        let mut all_user_ids = Vec::new();
1148        let channel_participants = payload.channel_participants;
1149        for entry in &channel_participants {
1150            for user_id in entry.participant_user_ids.iter() {
1151                if let Err(ix) = all_user_ids.binary_search(user_id) {
1152                    all_user_ids.insert(ix, *user_id);
1153                }
1154            }
1155        }
1156
1157        let users = self
1158            .user_store
1159            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1160        Some(cx.spawn(async move |this, cx| {
1161            let users = users.await?;
1162
1163            this.update(cx, |this, cx| {
1164                for entry in &channel_participants {
1165                    let mut participants: Vec<_> = entry
1166                        .participant_user_ids
1167                        .iter()
1168                        .filter_map(|user_id| {
1169                            users
1170                                .binary_search_by_key(&user_id, |user| &user.id)
1171                                .ok()
1172                                .map(|ix| users[ix].clone())
1173                        })
1174                        .collect();
1175
1176                    participants.sort_by_key(|u| u.id);
1177
1178                    this.channel_participants
1179                        .insert(ChannelId(entry.channel_id), participants);
1180                }
1181
1182                cx.notify();
1183            })
1184        }))
1185    }
1186}
1187
1188impl ChannelState {
1189    fn set_role(&mut self, role: ChannelRole) {
1190        self.role = Some(role);
1191    }
1192
1193    fn has_channel_buffer_changed(&self) -> bool {
1194        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1195            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1196                && self
1197                    .latest_notes_version
1198                    .version
1199                    .changed_since(&self.observed_notes_version.version))
1200    }
1201
1202    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1203        if self.observed_notes_version.epoch == epoch {
1204            self.observed_notes_version.version.join(version);
1205        } else {
1206            self.observed_notes_version = NotesVersion {
1207                epoch,
1208                version: version.clone(),
1209            };
1210        }
1211    }
1212
1213    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1214        if self.latest_notes_version.epoch == epoch {
1215            self.latest_notes_version.version.join(version);
1216        } else {
1217            self.latest_notes_version = NotesVersion {
1218                epoch,
1219                version: version.clone(),
1220            };
1221        }
1222    }
1223}