channel_store.rs

   1mod channel_index;
   2
   3use crate::channel_buffer::ChannelBuffer;
   4use anyhow::{Context as _, Result, anyhow};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
   7use collections::{HashMap, HashSet};
   8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
   9use gpui::{
  10    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
  11    WeakEntity,
  12};
  13use language::Capability;
  14use postage::{sink::Sink, watch};
  15use rpc::{
  16    TypedEnvelope,
  17    proto::{self, ChannelRole, ChannelVisibility},
  18};
  19use settings::Settings;
  20use std::{mem, sync::Arc, time::Duration};
  21use util::{ResultExt, maybe};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
  26    let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36pub struct ChannelStore {
  37    pub channel_index: ChannelIndex,
  38    channel_invitations: Vec<Arc<Channel>>,
  39    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  40    channel_states: HashMap<ChannelId, ChannelState>,
  41    outgoing_invites: HashSet<(ChannelId, UserId)>,
  42    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  43    opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
  44    client: Arc<Client>,
  45    did_subscribe: bool,
  46    channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
  47    user_store: Entity<UserStore>,
  48    _rpc_subscriptions: [Subscription; 2],
  49    _watch_connection_status: Task<Option<()>>,
  50    disconnect_channel_buffers_task: Option<Task<()>>,
  51    _update_channels: Task<()>,
  52}
  53
  54#[derive(Clone, Debug)]
  55pub struct Channel {
  56    pub id: ChannelId,
  57    pub name: SharedString,
  58    pub visibility: proto::ChannelVisibility,
  59    pub parent_path: Vec<ChannelId>,
  60    pub channel_order: i32,
  61}
  62
  63#[derive(Default, Debug)]
  64pub struct ChannelState {
  65    latest_notes_version: NotesVersion,
  66    observed_notes_version: NotesVersion,
  67    role: Option<ChannelRole>,
  68}
  69
  70impl Channel {
  71    pub fn link(&self, cx: &App) -> String {
  72        format!(
  73            "{}/channel/{}-{}",
  74            ClientSettings::get_global(cx).server_url,
  75            Self::slug(&self.name),
  76            self.id
  77        )
  78    }
  79
  80    pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
  81        self.link(cx)
  82            + "/notes"
  83            + &heading
  84                .map(|h| format!("#{}", Self::slug(&h)))
  85                .unwrap_or_default()
  86    }
  87
  88    pub fn is_root_channel(&self) -> bool {
  89        self.parent_path.is_empty()
  90    }
  91
  92    pub fn root_id(&self) -> ChannelId {
  93        self.parent_path.first().copied().unwrap_or(self.id)
  94    }
  95
  96    pub fn slug(str: &str) -> String {
  97        let slug: String = str
  98            .chars()
  99            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 100            .collect();
 101
 102        slug.trim_matches(|c| c == '-').to_string()
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub struct ChannelMembership {
 108    pub user: Arc<User>,
 109    pub kind: proto::channel_member::Kind,
 110    pub role: proto::ChannelRole,
 111}
 112impl ChannelMembership {
 113    pub fn sort_key(&self) -> MembershipSortKey<'_> {
 114        MembershipSortKey {
 115            role_order: match self.role {
 116                proto::ChannelRole::Admin => 0,
 117                proto::ChannelRole::Member => 1,
 118                proto::ChannelRole::Banned => 2,
 119                proto::ChannelRole::Talker => 3,
 120                proto::ChannelRole::Guest => 4,
 121            },
 122            kind_order: match self.kind {
 123                proto::channel_member::Kind::Member => 0,
 124                proto::channel_member::Kind::Invitee => 1,
 125            },
 126            username_order: &self.user.github_login,
 127        }
 128    }
 129}
 130
 131#[derive(PartialOrd, Ord, PartialEq, Eq)]
 132pub struct MembershipSortKey<'a> {
 133    role_order: u8,
 134    kind_order: u8,
 135    username_order: &'a str,
 136}
 137
 138pub enum ChannelEvent {
 139    ChannelCreated(ChannelId),
 140    ChannelRenamed(ChannelId),
 141}
 142
 143impl EventEmitter<ChannelEvent> for ChannelStore {}
 144
 145enum OpenEntityHandle<E> {
 146    Open(WeakEntity<E>),
 147    Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
 148}
 149
 150struct GlobalChannelStore(Entity<ChannelStore>);
 151
 152impl Global for GlobalChannelStore {}
 153
 154impl ChannelStore {
 155    pub fn global(cx: &App) -> Entity<Self> {
 156        cx.global::<GlobalChannelStore>().0.clone()
 157    }
 158
 159    pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 160        let rpc_subscriptions = [
 161            client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
 162            client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
 163        ];
 164
 165        let mut connection_status = client.status();
 166        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 167        let watch_connection_status = cx.spawn(async move |this, cx| {
 168            while let Some(status) = connection_status.next().await {
 169                let this = this.upgrade()?;
 170                match status {
 171                    client::Status::Connected { .. } => {
 172                        this.update(cx, |this, cx| this.handle_connect(cx))
 173                            .ok()?
 174                            .await
 175                            .log_err()?;
 176                    }
 177                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 178                        this.update(cx, |this, cx| this.handle_disconnect(false, cx))
 179                            .ok();
 180                    }
 181                    _ => {
 182                        this.update(cx, |this, cx| this.handle_disconnect(true, cx))
 183                            .ok();
 184                    }
 185                }
 186            }
 187            Some(())
 188        });
 189
 190        Self {
 191            channel_invitations: Vec::default(),
 192            channel_index: ChannelIndex::default(),
 193            channel_participants: Default::default(),
 194            outgoing_invites: Default::default(),
 195            opened_buffers: Default::default(),
 196            update_channels_tx,
 197            client,
 198            user_store,
 199            _rpc_subscriptions: rpc_subscriptions,
 200            _watch_connection_status: watch_connection_status,
 201            disconnect_channel_buffers_task: None,
 202            _update_channels: cx.spawn(async move |this, cx| {
 203                maybe!(async move {
 204                    while let Some(update_channels) = update_channels_rx.next().await {
 205                        if let Some(this) = this.upgrade() {
 206                            let update_task = this
 207                                .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
 208                            if let Some(update_task) = update_task {
 209                                update_task.await.log_err();
 210                            }
 211                        }
 212                    }
 213                    anyhow::Ok(())
 214                })
 215                .await
 216                .log_err();
 217            }),
 218            channel_states: Default::default(),
 219            did_subscribe: false,
 220            channels_loaded: watch::channel_with(false),
 221        }
 222    }
 223
 224    pub fn initialize(&mut self) {
 225        if !self.did_subscribe
 226            && self
 227                .client
 228                .send(proto::SubscribeToChannels {})
 229                .log_err()
 230                .is_some()
 231        {
 232            self.did_subscribe = true;
 233        }
 234    }
 235
 236    pub fn wait_for_channels(
 237        &mut self,
 238        timeout: Duration,
 239        cx: &mut Context<Self>,
 240    ) -> Task<Result<()>> {
 241        let mut channels_loaded_rx = self.channels_loaded.1.clone();
 242        if *channels_loaded_rx.borrow() {
 243            return Task::ready(Ok(()));
 244        }
 245
 246        let mut status_receiver = self.client.status();
 247        if status_receiver.borrow().is_connected() {
 248            self.initialize();
 249        }
 250
 251        let mut timer = cx.background_executor().timer(timeout).fuse();
 252        cx.spawn(async move |this, cx| {
 253            loop {
 254                futures::select_biased! {
 255                    channels_loaded = channels_loaded_rx.next().fuse() => {
 256                        if let Some(true) = channels_loaded {
 257                            return Ok(());
 258                        }
 259                    }
 260                    status = status_receiver.next().fuse() => {
 261                        if let Some(status) = status
 262                            && status.is_connected() {
 263                                this.update(cx, |this, _cx| {
 264                                    this.initialize();
 265                                }).ok();
 266                            }
 267                        continue;
 268                    }
 269                    _ = timer => {
 270                        return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
 271                    }
 272                }
 273            }
 274        })
 275    }
 276
 277    pub fn client(&self) -> Arc<Client> {
 278        self.client.clone()
 279    }
 280
 281    /// Returns the number of unique channels in the store
 282    pub fn channel_count(&self) -> usize {
 283        self.channel_index.by_id().len()
 284    }
 285
 286    /// Returns the index of a channel ID in the list of unique channels
 287    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 288        self.channel_index
 289            .by_id()
 290            .keys()
 291            .position(|id| *id == channel_id)
 292    }
 293
 294    /// Returns an iterator over all unique channels
 295    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 296        self.channel_index.by_id().values()
 297    }
 298
 299    /// Iterate over all entries in the channel DAG
 300    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 301        self.channel_index
 302            .ordered_channels()
 303            .iter()
 304            .filter_map(move |id| {
 305                let channel = self.channel_index.by_id().get(id)?;
 306                Some((channel.parent_path.len(), channel))
 307            })
 308    }
 309
 310    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 311        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 312        self.channel_index.by_id().get(channel_id)
 313    }
 314
 315    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 316        self.channel_index.by_id().values().nth(ix)
 317    }
 318
 319    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 320        self.channel_invitations
 321            .iter()
 322            .any(|channel| channel.id == channel_id)
 323    }
 324
 325    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 326        &self.channel_invitations
 327    }
 328
 329    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 330        self.channel_index.by_id().get(&channel_id)
 331    }
 332
 333    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
 334        if let Some(buffer) = self.opened_buffers.get(&channel_id)
 335            && let OpenEntityHandle::Open(buffer) = buffer
 336        {
 337            return buffer.upgrade().is_some();
 338        }
 339        false
 340    }
 341
 342    pub fn open_channel_buffer(
 343        &mut self,
 344        channel_id: ChannelId,
 345        cx: &mut Context<Self>,
 346    ) -> Task<Result<Entity<ChannelBuffer>>> {
 347        let client = self.client.clone();
 348        let user_store = self.user_store.clone();
 349        let channel_store = cx.entity();
 350        self.open_channel_resource(
 351            channel_id,
 352            "notes",
 353            |this| &mut this.opened_buffers,
 354            async move |channel, cx| {
 355                ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
 356            },
 357            cx,
 358        )
 359    }
 360
 361    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 362        self.channel_states
 363            .get(&channel_id)
 364            .is_some_and(|state| state.has_channel_buffer_changed())
 365    }
 366
 367    pub fn acknowledge_notes_version(
 368        &mut self,
 369        channel_id: ChannelId,
 370        epoch: u64,
 371        version: &clock::Global,
 372        cx: &mut Context<Self>,
 373    ) {
 374        self.channel_states
 375            .entry(channel_id)
 376            .or_default()
 377            .acknowledge_notes_version(epoch, version);
 378        cx.notify()
 379    }
 380
 381    pub fn update_latest_notes_version(
 382        &mut self,
 383        channel_id: ChannelId,
 384        epoch: u64,
 385        version: &clock::Global,
 386        cx: &mut Context<Self>,
 387    ) {
 388        self.channel_states
 389            .entry(channel_id)
 390            .or_default()
 391            .update_latest_notes_version(epoch, version);
 392        cx.notify()
 393    }
 394
 395    /// Asynchronously open a given resource associated with a channel.
 396    ///
 397    /// Make sure that the resource is only opened once, even if this method
 398    /// is called multiple times with the same channel id while the first task
 399    /// is still running.
 400    fn open_channel_resource<T, F>(
 401        &mut self,
 402        channel_id: ChannelId,
 403        resource_name: &'static str,
 404        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
 405        load: F,
 406        cx: &mut Context<Self>,
 407    ) -> Task<Result<Entity<T>>>
 408    where
 409        F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
 410        T: 'static,
 411    {
 412        let task = loop {
 413            match get_map(self).get(&channel_id) {
 414                Some(OpenEntityHandle::Open(entity)) => {
 415                    if let Some(entity) = entity.upgrade() {
 416                        break Task::ready(Ok(entity)).shared();
 417                    } else {
 418                        get_map(self).remove(&channel_id);
 419                        continue;
 420                    }
 421                }
 422                Some(OpenEntityHandle::Loading(task)) => break task.clone(),
 423                None => {}
 424            }
 425
 426            let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
 427            let task = cx
 428                .spawn(async move |this, cx| {
 429                    channels_ready.await?;
 430                    let channel = this.read_with(cx, |this, _| {
 431                        this.channel_for_id(channel_id)
 432                            .cloned()
 433                            .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
 434                    })??;
 435
 436                    load(channel, cx).await.map_err(Arc::new)
 437                })
 438                .shared();
 439
 440            get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
 441            let task = cx.spawn({
 442                async move |this, cx| {
 443                    let result = task.await;
 444                    this.update(cx, |this, _| match &result {
 445                        Ok(model) => {
 446                            get_map(this)
 447                                .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
 448                        }
 449                        Err(_) => {
 450                            get_map(this).remove(&channel_id);
 451                        }
 452                    })?;
 453                    result
 454                }
 455            });
 456            break task.shared();
 457        };
 458        cx.background_spawn(async move {
 459            task.await.map_err(|error| {
 460                anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
 461            })
 462        })
 463    }
 464
 465    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 466        self.channel_role(channel_id) == proto::ChannelRole::Admin
 467    }
 468
 469    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 470        self.channel_index
 471            .by_id()
 472            .get(&channel_id)
 473            .is_some_and(|channel| channel.is_root_channel())
 474    }
 475
 476    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 477        self.channel_index
 478            .by_id()
 479            .get(&channel_id)
 480            .is_some_and(|channel| channel.visibility == ChannelVisibility::Public)
 481    }
 482
 483    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 484        match self.channel_role(channel_id) {
 485            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 486            _ => Capability::ReadOnly,
 487        }
 488    }
 489
 490    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 491        maybe!({
 492            let mut channel = self.channel_for_id(channel_id)?;
 493            if !channel.is_root_channel() {
 494                channel = self.channel_for_id(channel.root_id())?;
 495            }
 496            let root_channel_state = self.channel_states.get(&channel.id);
 497            root_channel_state?.role
 498        })
 499        .unwrap_or(proto::ChannelRole::Guest)
 500    }
 501
 502    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 503        self.channel_participants
 504            .get(&channel_id)
 505            .map_or(&[], |v| v.as_slice())
 506    }
 507
 508    pub fn create_channel(
 509        &self,
 510        name: &str,
 511        parent_id: Option<ChannelId>,
 512        cx: &mut Context<Self>,
 513    ) -> Task<Result<ChannelId>> {
 514        let client = self.client.clone();
 515        let name = name.trim_start_matches('#').to_owned();
 516        cx.spawn(async move |this, cx| {
 517            let response = client
 518                .request(proto::CreateChannel {
 519                    name,
 520                    parent_id: parent_id.map(|cid| cid.0),
 521                })
 522                .await?;
 523
 524            let channel = response.channel.context("missing channel in response")?;
 525            let channel_id = ChannelId(channel.id);
 526
 527            this.update(cx, |this, cx| {
 528                let task = this.update_channels(
 529                    proto::UpdateChannels {
 530                        channels: vec![channel],
 531                        ..Default::default()
 532                    },
 533                    cx,
 534                );
 535                assert!(task.is_none());
 536
 537                // This event is emitted because the collab panel wants to clear the pending edit state
 538                // before this frame is rendered. But we can't guarantee that the collab panel's future
 539                // will resolve before this flush_effects finishes. Synchronously emitting this event
 540                // ensures that the collab panel will observe this creation before the frame completes
 541                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 542            })?;
 543
 544            Ok(channel_id)
 545        })
 546    }
 547
 548    pub fn move_channel(
 549        &mut self,
 550        channel_id: ChannelId,
 551        to: ChannelId,
 552        cx: &mut Context<Self>,
 553    ) -> Task<Result<()>> {
 554        let client = self.client.clone();
 555        cx.spawn(async move |_, _| {
 556            let _ = client
 557                .request(proto::MoveChannel {
 558                    channel_id: channel_id.0,
 559                    to: to.0,
 560                })
 561                .await?;
 562            Ok(())
 563        })
 564    }
 565
 566    pub fn reorder_channel(
 567        &mut self,
 568        channel_id: ChannelId,
 569        direction: proto::reorder_channel::Direction,
 570        cx: &mut Context<Self>,
 571    ) -> Task<Result<()>> {
 572        let client = self.client.clone();
 573        cx.spawn(async move |_, _| {
 574            client
 575                .request(proto::ReorderChannel {
 576                    channel_id: channel_id.0,
 577                    direction: direction.into(),
 578                })
 579                .await?;
 580            Ok(())
 581        })
 582    }
 583
 584    pub fn set_channel_visibility(
 585        &mut self,
 586        channel_id: ChannelId,
 587        visibility: ChannelVisibility,
 588        cx: &mut Context<Self>,
 589    ) -> Task<Result<()>> {
 590        let client = self.client.clone();
 591        cx.spawn(async move |_, _| {
 592            let _ = client
 593                .request(proto::SetChannelVisibility {
 594                    channel_id: channel_id.0,
 595                    visibility: visibility.into(),
 596                })
 597                .await?;
 598
 599            Ok(())
 600        })
 601    }
 602
 603    pub fn invite_member(
 604        &mut self,
 605        channel_id: ChannelId,
 606        user_id: UserId,
 607        role: proto::ChannelRole,
 608        cx: &mut Context<Self>,
 609    ) -> Task<Result<()>> {
 610        if !self.outgoing_invites.insert((channel_id, user_id)) {
 611            return Task::ready(Err(anyhow!("invite request already in progress")));
 612        }
 613
 614        cx.notify();
 615        let client = self.client.clone();
 616        cx.spawn(async move |this, cx| {
 617            let result = client
 618                .request(proto::InviteChannelMember {
 619                    channel_id: channel_id.0,
 620                    user_id,
 621                    role: role.into(),
 622                })
 623                .await;
 624
 625            this.update(cx, |this, cx| {
 626                this.outgoing_invites.remove(&(channel_id, user_id));
 627                cx.notify();
 628            })?;
 629
 630            result?;
 631
 632            Ok(())
 633        })
 634    }
 635
 636    pub fn remove_member(
 637        &mut self,
 638        channel_id: ChannelId,
 639        user_id: u64,
 640        cx: &mut Context<Self>,
 641    ) -> Task<Result<()>> {
 642        if !self.outgoing_invites.insert((channel_id, user_id)) {
 643            return Task::ready(Err(anyhow!("invite request already in progress")));
 644        }
 645
 646        cx.notify();
 647        let client = self.client.clone();
 648        cx.spawn(async move |this, cx| {
 649            let result = client
 650                .request(proto::RemoveChannelMember {
 651                    channel_id: channel_id.0,
 652                    user_id,
 653                })
 654                .await;
 655
 656            this.update(cx, |this, cx| {
 657                this.outgoing_invites.remove(&(channel_id, user_id));
 658                cx.notify();
 659            })?;
 660            result?;
 661            Ok(())
 662        })
 663    }
 664
 665    pub fn set_member_role(
 666        &mut self,
 667        channel_id: ChannelId,
 668        user_id: UserId,
 669        role: proto::ChannelRole,
 670        cx: &mut Context<Self>,
 671    ) -> Task<Result<()>> {
 672        if !self.outgoing_invites.insert((channel_id, user_id)) {
 673            return Task::ready(Err(anyhow!("member request already in progress")));
 674        }
 675
 676        cx.notify();
 677        let client = self.client.clone();
 678        cx.spawn(async move |this, cx| {
 679            let result = client
 680                .request(proto::SetChannelMemberRole {
 681                    channel_id: channel_id.0,
 682                    user_id,
 683                    role: role.into(),
 684                })
 685                .await;
 686
 687            this.update(cx, |this, cx| {
 688                this.outgoing_invites.remove(&(channel_id, user_id));
 689                cx.notify();
 690            })?;
 691
 692            result?;
 693            Ok(())
 694        })
 695    }
 696
 697    pub fn rename(
 698        &mut self,
 699        channel_id: ChannelId,
 700        new_name: &str,
 701        cx: &mut Context<Self>,
 702    ) -> Task<Result<()>> {
 703        let client = self.client.clone();
 704        let name = new_name.to_string();
 705        cx.spawn(async move |this, cx| {
 706            let channel = client
 707                .request(proto::RenameChannel {
 708                    channel_id: channel_id.0,
 709                    name,
 710                })
 711                .await?
 712                .channel
 713                .context("missing channel in response")?;
 714            this.update(cx, |this, cx| {
 715                let task = this.update_channels(
 716                    proto::UpdateChannels {
 717                        channels: vec![channel],
 718                        ..Default::default()
 719                    },
 720                    cx,
 721                );
 722                assert!(task.is_none());
 723
 724                // This event is emitted because the collab panel wants to clear the pending edit state
 725                // before this frame is rendered. But we can't guarantee that the collab panel's future
 726                // will resolve before this flush_effects finishes. Synchronously emitting this event
 727                // ensures that the collab panel will observe this creation before the frame complete
 728                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 729            })?;
 730            Ok(())
 731        })
 732    }
 733
 734    pub fn respond_to_channel_invite(
 735        &mut self,
 736        channel_id: ChannelId,
 737        accept: bool,
 738        cx: &mut Context<Self>,
 739    ) -> Task<Result<()>> {
 740        let client = self.client.clone();
 741        cx.background_spawn(async move {
 742            client
 743                .request(proto::RespondToChannelInvite {
 744                    channel_id: channel_id.0,
 745                    accept,
 746                })
 747                .await?;
 748            Ok(())
 749        })
 750    }
 751    pub fn fuzzy_search_members(
 752        &self,
 753        channel_id: ChannelId,
 754        query: String,
 755        limit: u16,
 756        cx: &mut Context<Self>,
 757    ) -> Task<Result<Vec<ChannelMembership>>> {
 758        let client = self.client.clone();
 759        let user_store = self.user_store.downgrade();
 760        cx.spawn(async move |_, cx| {
 761            let response = client
 762                .request(proto::GetChannelMembers {
 763                    channel_id: channel_id.0,
 764                    query,
 765                    limit: limit as u64,
 766                })
 767                .await?;
 768            user_store.update(cx, |user_store, _| {
 769                user_store.insert(response.users);
 770                response
 771                    .members
 772                    .into_iter()
 773                    .filter_map(|member| {
 774                        Some(ChannelMembership {
 775                            user: user_store.get_cached_user(member.user_id)?,
 776                            role: member.role(),
 777                            kind: member.kind(),
 778                        })
 779                    })
 780                    .collect()
 781            })
 782        })
 783    }
 784
 785    pub fn remove_channel(
 786        &self,
 787        channel_id: ChannelId,
 788    ) -> impl Future<Output = Result<()>> + use<> {
 789        let client = self.client.clone();
 790        async move {
 791            client
 792                .request(proto::DeleteChannel {
 793                    channel_id: channel_id.0,
 794                })
 795                .await?;
 796            Ok(())
 797        }
 798    }
 799
 800    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 801        false
 802    }
 803
 804    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 805        self.outgoing_invites.contains(&(channel_id, user_id))
 806    }
 807
 808    async fn handle_update_channels(
 809        this: Entity<Self>,
 810        message: TypedEnvelope<proto::UpdateChannels>,
 811        cx: AsyncApp,
 812    ) -> Result<()> {
 813        this.read_with(&cx, |this, _| {
 814            this.update_channels_tx
 815                .unbounded_send(message.payload)
 816                .unwrap();
 817        })?;
 818        Ok(())
 819    }
 820
 821    async fn handle_update_user_channels(
 822        this: Entity<Self>,
 823        message: TypedEnvelope<proto::UpdateUserChannels>,
 824        mut cx: AsyncApp,
 825    ) -> Result<()> {
 826        this.update(&mut cx, |this, cx| {
 827            for buffer_version in message.payload.observed_channel_buffer_version {
 828                let version = language::proto::deserialize_version(&buffer_version.version);
 829                this.acknowledge_notes_version(
 830                    ChannelId(buffer_version.channel_id),
 831                    buffer_version.epoch,
 832                    &version,
 833                    cx,
 834                );
 835            }
 836            for membership in message.payload.channel_memberships {
 837                if let Some(role) = ChannelRole::from_i32(membership.role) {
 838                    this.channel_states
 839                        .entry(ChannelId(membership.channel_id))
 840                        .or_default()
 841                        .set_role(role)
 842                }
 843            }
 844        })
 845    }
 846
 847    fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 848        self.channel_index.clear();
 849        self.channel_invitations.clear();
 850        self.channel_participants.clear();
 851        self.channel_index.clear();
 852        self.outgoing_invites.clear();
 853        self.disconnect_channel_buffers_task.take();
 854
 855        let mut buffer_versions = Vec::new();
 856        for buffer in self.opened_buffers.values() {
 857            if let OpenEntityHandle::Open(buffer) = buffer
 858                && let Some(buffer) = buffer.upgrade()
 859            {
 860                let channel_buffer = buffer.read(cx);
 861                let buffer = channel_buffer.buffer().read(cx);
 862                buffer_versions.push(proto::ChannelBufferVersion {
 863                    channel_id: channel_buffer.channel_id.0,
 864                    epoch: channel_buffer.epoch(),
 865                    version: language::proto::serialize_version(&buffer.version()),
 866                });
 867            }
 868        }
 869
 870        if buffer_versions.is_empty() {
 871            return Task::ready(Ok(()));
 872        }
 873
 874        let response = self.client.request(proto::RejoinChannelBuffers {
 875            buffers: buffer_versions,
 876        });
 877
 878        cx.spawn(async move |this, cx| {
 879            let mut response = response.await?;
 880
 881            this.update(cx, |this, cx| {
 882                this.opened_buffers.retain(|_, buffer| match buffer {
 883                    OpenEntityHandle::Open(channel_buffer) => {
 884                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 885                            return false;
 886                        };
 887
 888                        channel_buffer.update(cx, |channel_buffer, cx| {
 889                            let channel_id = channel_buffer.channel_id;
 890                            if let Some(remote_buffer) = response
 891                                .buffers
 892                                .iter_mut()
 893                                .find(|buffer| buffer.channel_id == channel_id.0)
 894                            {
 895                                let channel_id = channel_buffer.channel_id;
 896                                let remote_version =
 897                                    language::proto::deserialize_version(&remote_buffer.version);
 898
 899                                channel_buffer.replace_collaborators(
 900                                    mem::take(&mut remote_buffer.collaborators),
 901                                    cx,
 902                                );
 903
 904                                let operations = channel_buffer
 905                                    .buffer()
 906                                    .update(cx, |buffer, cx| {
 907                                        let outgoing_operations =
 908                                            buffer.serialize_ops(Some(remote_version), cx);
 909                                        let incoming_operations =
 910                                            mem::take(&mut remote_buffer.operations)
 911                                                .into_iter()
 912                                                .map(language::proto::deserialize_operation)
 913                                                .collect::<Result<Vec<_>>>()?;
 914                                        buffer.apply_ops(incoming_operations, cx);
 915                                        anyhow::Ok(outgoing_operations)
 916                                    })
 917                                    .log_err();
 918
 919                                if let Some(operations) = operations {
 920                                    channel_buffer.connected(cx);
 921                                    let client = this.client.clone();
 922                                    cx.background_spawn(async move {
 923                                        let operations = operations.await;
 924                                        for chunk in language::proto::split_operations(operations) {
 925                                            client
 926                                                .send(proto::UpdateChannelBuffer {
 927                                                    channel_id: channel_id.0,
 928                                                    operations: chunk,
 929                                                })
 930                                                .ok();
 931                                        }
 932                                    })
 933                                    .detach();
 934                                    return true;
 935                                }
 936                            }
 937
 938                            channel_buffer.disconnect(cx);
 939                            false
 940                        })
 941                    }
 942                    OpenEntityHandle::Loading(_) => true,
 943                });
 944            })
 945            .ok();
 946            anyhow::Ok(())
 947        })
 948    }
 949
 950    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
 951        cx.notify();
 952        self.did_subscribe = false;
 953        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 954            cx.spawn(async move |this, cx| {
 955                if wait_for_reconnect {
 956                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
 957                }
 958
 959                if let Some(this) = this.upgrade() {
 960                    this.update(cx, |this, cx| {
 961                        for buffer in this.opened_buffers.values() {
 962                            if let OpenEntityHandle::Open(buffer) = &buffer
 963                                && let Some(buffer) = buffer.upgrade()
 964                            {
 965                                buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 966                            }
 967                        }
 968                    })
 969                    .ok();
 970                }
 971            })
 972        });
 973    }
 974
 975    #[cfg(any(test, feature = "test-support"))]
 976    pub fn reset(&mut self) {
 977        self.channel_invitations.clear();
 978        self.channel_index.clear();
 979        self.channel_participants.clear();
 980        self.outgoing_invites.clear();
 981        self.opened_buffers.clear();
 982        self.disconnect_channel_buffers_task = None;
 983        self.channel_states.clear();
 984    }
 985
 986    pub(crate) fn update_channels(
 987        &mut self,
 988        payload: proto::UpdateChannels,
 989        cx: &mut Context<ChannelStore>,
 990    ) -> Option<Task<Result<()>>> {
 991        if !payload.remove_channel_invitations.is_empty() {
 992            self.channel_invitations
 993                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
 994        }
 995        for channel in payload.channel_invitations {
 996            match self
 997                .channel_invitations
 998                .binary_search_by_key(&channel.id, |c| c.id.0)
 999            {
1000                Ok(ix) => {
1001                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1002                }
1003                Err(ix) => self.channel_invitations.insert(
1004                    ix,
1005                    Arc::new(Channel {
1006                        id: ChannelId(channel.id),
1007                        visibility: channel.visibility(),
1008                        name: channel.name.into(),
1009                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1010                        channel_order: channel.channel_order,
1011                    }),
1012                ),
1013            }
1014        }
1015
1016        let channels_changed = !payload.channels.is_empty()
1017            || !payload.delete_channels.is_empty()
1018            || !payload.latest_channel_buffer_versions.is_empty();
1019
1020        if channels_changed {
1021            if !payload.delete_channels.is_empty() {
1022                let delete_channels: Vec<ChannelId> =
1023                    payload.delete_channels.into_iter().map(ChannelId).collect();
1024                self.channel_index.delete_channels(&delete_channels);
1025                self.channel_participants
1026                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1027
1028                for channel_id in &delete_channels {
1029                    let channel_id = *channel_id;
1030                    if payload
1031                        .channels
1032                        .iter()
1033                        .any(|channel| channel.id == channel_id.0)
1034                    {
1035                        continue;
1036                    }
1037                    if let Some(OpenEntityHandle::Open(buffer)) =
1038                        self.opened_buffers.remove(&channel_id)
1039                        && let Some(buffer) = buffer.upgrade()
1040                    {
1041                        buffer.update(cx, ChannelBuffer::disconnect);
1042                    }
1043                }
1044            }
1045
1046            let mut index = self.channel_index.bulk_insert();
1047            for channel in payload.channels {
1048                let id = ChannelId(channel.id);
1049                let channel_changed = index.insert(channel);
1050
1051                if channel_changed
1052                    && let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id)
1053                    && let Some(buffer) = buffer.upgrade()
1054                {
1055                    buffer.update(cx, ChannelBuffer::channel_changed);
1056                }
1057            }
1058
1059            for latest_buffer_version in payload.latest_channel_buffer_versions {
1060                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1061                self.channel_states
1062                    .entry(ChannelId(latest_buffer_version.channel_id))
1063                    .or_default()
1064                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1065            }
1066
1067            self.channels_loaded.0.try_send(true).log_err();
1068        }
1069
1070        cx.notify();
1071        if payload.channel_participants.is_empty() {
1072            return None;
1073        }
1074
1075        let mut all_user_ids = Vec::new();
1076        let channel_participants = payload.channel_participants;
1077        for entry in &channel_participants {
1078            for user_id in entry.participant_user_ids.iter() {
1079                if let Err(ix) = all_user_ids.binary_search(user_id) {
1080                    all_user_ids.insert(ix, *user_id);
1081                }
1082            }
1083        }
1084
1085        let users = self
1086            .user_store
1087            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1088        Some(cx.spawn(async move |this, cx| {
1089            let users = users.await?;
1090
1091            this.update(cx, |this, cx| {
1092                for entry in &channel_participants {
1093                    let mut participants: Vec<_> = entry
1094                        .participant_user_ids
1095                        .iter()
1096                        .filter_map(|user_id| {
1097                            users
1098                                .binary_search_by_key(&user_id, |user| &user.id)
1099                                .ok()
1100                                .map(|ix| users[ix].clone())
1101                        })
1102                        .collect();
1103
1104                    participants.sort_by_key(|u| u.id);
1105
1106                    this.channel_participants
1107                        .insert(ChannelId(entry.channel_id), participants);
1108                }
1109
1110                cx.notify();
1111            })
1112        }))
1113    }
1114}
1115
1116impl ChannelState {
1117    fn set_role(&mut self, role: ChannelRole) {
1118        self.role = Some(role);
1119    }
1120
1121    fn has_channel_buffer_changed(&self) -> bool {
1122        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1123            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1124                && self
1125                    .latest_notes_version
1126                    .version
1127                    .changed_since(&self.observed_notes_version.version))
1128    }
1129
1130    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1131        if self.observed_notes_version.epoch == epoch {
1132            self.observed_notes_version.version.join(version);
1133        } else {
1134            self.observed_notes_version = NotesVersion {
1135                epoch,
1136                version: version.clone(),
1137            };
1138        }
1139    }
1140
1141    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1142        if self.latest_notes_version.epoch == epoch {
1143            self.latest_notes_version.version.join(version);
1144        } else {
1145            self.latest_notes_version = NotesVersion {
1146                epoch,
1147                version: version.clone(),
1148            };
1149        }
1150    }
1151}