channel_store.rs

   1mod channel_index;
   2
   3use crate::{ChannelMessage, channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
   4use anyhow::{Context as _, Result, anyhow};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
   7use collections::{HashMap, HashSet};
   8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
   9use gpui::{
  10    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
  11    WeakEntity,
  12};
  13use language::Capability;
  14use postage::{sink::Sink, watch};
  15use rpc::{
  16    TypedEnvelope,
  17    proto::{self, ChannelRole, ChannelVisibility},
  18};
  19use settings::Settings;
  20use std::{mem, sync::Arc, time::Duration};
  21use util::{ResultExt, maybe};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
  26    let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36pub struct ChannelStore {
  37    pub channel_index: ChannelIndex,
  38    channel_invitations: Vec<Arc<Channel>>,
  39    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  40    channel_states: HashMap<ChannelId, ChannelState>,
  41    outgoing_invites: HashSet<(ChannelId, UserId)>,
  42    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  43    opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
  44    opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
  45    client: Arc<Client>,
  46    did_subscribe: bool,
  47    channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
  48    user_store: Entity<UserStore>,
  49    _rpc_subscriptions: [Subscription; 2],
  50    _watch_connection_status: Task<Option<()>>,
  51    disconnect_channel_buffers_task: Option<Task<()>>,
  52    _update_channels: Task<()>,
  53}
  54
  55#[derive(Clone, Debug)]
  56pub struct Channel {
  57    pub id: ChannelId,
  58    pub name: SharedString,
  59    pub visibility: proto::ChannelVisibility,
  60    pub parent_path: Vec<ChannelId>,
  61    pub channel_order: i32,
  62}
  63
  64#[derive(Default, Debug)]
  65pub struct ChannelState {
  66    latest_chat_message: Option<u64>,
  67    latest_notes_version: NotesVersion,
  68    observed_notes_version: NotesVersion,
  69    observed_chat_message: Option<u64>,
  70    role: Option<ChannelRole>,
  71}
  72
  73impl Channel {
  74    pub fn link(&self, cx: &App) -> String {
  75        format!(
  76            "{}/channel/{}-{}",
  77            ClientSettings::get_global(cx).server_url,
  78            Self::slug(&self.name),
  79            self.id
  80        )
  81    }
  82
  83    pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
  84        self.link(cx)
  85            + "/notes"
  86            + &heading
  87                .map(|h| format!("#{}", Self::slug(&h)))
  88                .unwrap_or_default()
  89    }
  90
  91    pub fn is_root_channel(&self) -> bool {
  92        self.parent_path.is_empty()
  93    }
  94
  95    pub fn root_id(&self) -> ChannelId {
  96        self.parent_path.first().copied().unwrap_or(self.id)
  97    }
  98
  99    pub fn slug(str: &str) -> String {
 100        let slug: String = str
 101            .chars()
 102            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 103            .collect();
 104
 105        slug.trim_matches(|c| c == '-').to_string()
 106    }
 107}
 108
 109#[derive(Debug)]
 110pub struct ChannelMembership {
 111    pub user: Arc<User>,
 112    pub kind: proto::channel_member::Kind,
 113    pub role: proto::ChannelRole,
 114}
 115impl ChannelMembership {
 116    pub fn sort_key(&self) -> MembershipSortKey<'_> {
 117        MembershipSortKey {
 118            role_order: match self.role {
 119                proto::ChannelRole::Admin => 0,
 120                proto::ChannelRole::Member => 1,
 121                proto::ChannelRole::Banned => 2,
 122                proto::ChannelRole::Talker => 3,
 123                proto::ChannelRole::Guest => 4,
 124            },
 125            kind_order: match self.kind {
 126                proto::channel_member::Kind::Member => 0,
 127                proto::channel_member::Kind::Invitee => 1,
 128            },
 129            username_order: &self.user.github_login,
 130        }
 131    }
 132}
 133
 134#[derive(PartialOrd, Ord, PartialEq, Eq)]
 135pub struct MembershipSortKey<'a> {
 136    role_order: u8,
 137    kind_order: u8,
 138    username_order: &'a str,
 139}
 140
 141pub enum ChannelEvent {
 142    ChannelCreated(ChannelId),
 143    ChannelRenamed(ChannelId),
 144}
 145
 146impl EventEmitter<ChannelEvent> for ChannelStore {}
 147
 148enum OpenEntityHandle<E> {
 149    Open(WeakEntity<E>),
 150    Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
 151}
 152
 153struct GlobalChannelStore(Entity<ChannelStore>);
 154
 155impl Global for GlobalChannelStore {}
 156
 157impl ChannelStore {
 158    pub fn global(cx: &App) -> Entity<Self> {
 159        cx.global::<GlobalChannelStore>().0.clone()
 160    }
 161
 162    pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 163        let rpc_subscriptions = [
 164            client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
 165            client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
 166        ];
 167
 168        let mut connection_status = client.status();
 169        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 170        let watch_connection_status = cx.spawn(async move |this, cx| {
 171            while let Some(status) = connection_status.next().await {
 172                let this = this.upgrade()?;
 173                match status {
 174                    client::Status::Connected { .. } => {
 175                        this.update(cx, |this, cx| this.handle_connect(cx))
 176                            .ok()?
 177                            .await
 178                            .log_err()?;
 179                    }
 180                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 181                        this.update(cx, |this, cx| this.handle_disconnect(false, cx))
 182                            .ok();
 183                    }
 184                    _ => {
 185                        this.update(cx, |this, cx| this.handle_disconnect(true, cx))
 186                            .ok();
 187                    }
 188                }
 189            }
 190            Some(())
 191        });
 192
 193        Self {
 194            channel_invitations: Vec::default(),
 195            channel_index: ChannelIndex::default(),
 196            channel_participants: Default::default(),
 197            outgoing_invites: Default::default(),
 198            opened_buffers: Default::default(),
 199            opened_chats: Default::default(),
 200            update_channels_tx,
 201            client,
 202            user_store,
 203            _rpc_subscriptions: rpc_subscriptions,
 204            _watch_connection_status: watch_connection_status,
 205            disconnect_channel_buffers_task: None,
 206            _update_channels: cx.spawn(async move |this, cx| {
 207                maybe!(async move {
 208                    while let Some(update_channels) = update_channels_rx.next().await {
 209                        if let Some(this) = this.upgrade() {
 210                            let update_task = this
 211                                .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
 212                            if let Some(update_task) = update_task {
 213                                update_task.await.log_err();
 214                            }
 215                        }
 216                    }
 217                    anyhow::Ok(())
 218                })
 219                .await
 220                .log_err();
 221            }),
 222            channel_states: Default::default(),
 223            did_subscribe: false,
 224            channels_loaded: watch::channel_with(false),
 225        }
 226    }
 227
 228    pub fn initialize(&mut self) {
 229        if !self.did_subscribe
 230            && self
 231                .client
 232                .send(proto::SubscribeToChannels {})
 233                .log_err()
 234                .is_some()
 235        {
 236            self.did_subscribe = true;
 237        }
 238    }
 239
 240    pub fn wait_for_channels(
 241        &mut self,
 242        timeout: Duration,
 243        cx: &mut Context<Self>,
 244    ) -> Task<Result<()>> {
 245        let mut channels_loaded_rx = self.channels_loaded.1.clone();
 246        if *channels_loaded_rx.borrow() {
 247            return Task::ready(Ok(()));
 248        }
 249
 250        let mut status_receiver = self.client.status();
 251        if status_receiver.borrow().is_connected() {
 252            self.initialize();
 253        }
 254
 255        let mut timer = cx.background_executor().timer(timeout).fuse();
 256        cx.spawn(async move |this, cx| {
 257            loop {
 258                futures::select_biased! {
 259                    channels_loaded = channels_loaded_rx.next().fuse() => {
 260                        if let Some(true) = channels_loaded {
 261                            return Ok(());
 262                        }
 263                    }
 264                    status = status_receiver.next().fuse() => {
 265                        if let Some(status) = status {
 266                            if status.is_connected() {
 267                                this.update(cx, |this, _cx| {
 268                                    this.initialize();
 269                                }).ok();
 270                            }
 271                        }
 272                        continue;
 273                    }
 274                    _ = timer => {
 275                        return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
 276                    }
 277                }
 278            }
 279        })
 280    }
 281
 282    pub fn client(&self) -> Arc<Client> {
 283        self.client.clone()
 284    }
 285
 286    /// Returns the number of unique channels in the store
 287    pub fn channel_count(&self) -> usize {
 288        self.channel_index.by_id().len()
 289    }
 290
 291    /// Returns the index of a channel ID in the list of unique channels
 292    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 293        self.channel_index
 294            .by_id()
 295            .keys()
 296            .position(|id| *id == channel_id)
 297    }
 298
 299    /// Returns an iterator over all unique channels
 300    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 301        self.channel_index.by_id().values()
 302    }
 303
 304    /// Iterate over all entries in the channel DAG
 305    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 306        self.channel_index
 307            .ordered_channels()
 308            .iter()
 309            .filter_map(move |id| {
 310                let channel = self.channel_index.by_id().get(id)?;
 311                Some((channel.parent_path.len(), channel))
 312            })
 313    }
 314
 315    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 316        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 317        self.channel_index.by_id().get(channel_id)
 318    }
 319
 320    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 321        self.channel_index.by_id().values().nth(ix)
 322    }
 323
 324    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 325        self.channel_invitations
 326            .iter()
 327            .any(|channel| channel.id == channel_id)
 328    }
 329
 330    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 331        &self.channel_invitations
 332    }
 333
 334    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 335        self.channel_index.by_id().get(&channel_id)
 336    }
 337
 338    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
 339        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 340            if let OpenEntityHandle::Open(buffer) = buffer {
 341                return buffer.upgrade().is_some();
 342            }
 343        }
 344        false
 345    }
 346
 347    pub fn open_channel_buffer(
 348        &mut self,
 349        channel_id: ChannelId,
 350        cx: &mut Context<Self>,
 351    ) -> Task<Result<Entity<ChannelBuffer>>> {
 352        let client = self.client.clone();
 353        let user_store = self.user_store.clone();
 354        let channel_store = cx.entity();
 355        self.open_channel_resource(
 356            channel_id,
 357            "notes",
 358            |this| &mut this.opened_buffers,
 359            async move |channel, cx| {
 360                ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
 361            },
 362            cx,
 363        )
 364    }
 365
 366    pub fn fetch_channel_messages(
 367        &self,
 368        message_ids: Vec<u64>,
 369        cx: &mut Context<Self>,
 370    ) -> Task<Result<Vec<ChannelMessage>>> {
 371        let request = if message_ids.is_empty() {
 372            None
 373        } else {
 374            Some(
 375                self.client
 376                    .request(proto::GetChannelMessagesById { message_ids }),
 377            )
 378        };
 379        cx.spawn(async move |this, cx| {
 380            if let Some(request) = request {
 381                let response = request.await?;
 382                let this = this.upgrade().context("channel store dropped")?;
 383                let user_store = this.read_with(cx, |this, _| this.user_store.clone())?;
 384                ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
 385            } else {
 386                Ok(Vec::new())
 387            }
 388        })
 389    }
 390
 391    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 392        self.channel_states
 393            .get(&channel_id)
 394            .is_some_and(|state| state.has_channel_buffer_changed())
 395    }
 396
 397    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 398        self.channel_states
 399            .get(&channel_id)
 400            .is_some_and(|state| state.has_new_messages())
 401    }
 402
 403    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 404        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 405            state.latest_chat_message = message_id;
 406        }
 407    }
 408
 409    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 410        self.channel_states.get(&channel_id).and_then(|state| {
 411            if let Some(last_message_id) = state.latest_chat_message {
 412                if state
 413                    .last_acknowledged_message_id()
 414                    .is_some_and(|id| id < last_message_id)
 415                {
 416                    return state.last_acknowledged_message_id();
 417                }
 418            }
 419
 420            None
 421        })
 422    }
 423
 424    pub fn acknowledge_message_id(
 425        &mut self,
 426        channel_id: ChannelId,
 427        message_id: u64,
 428        cx: &mut Context<Self>,
 429    ) {
 430        self.channel_states
 431            .entry(channel_id)
 432            .or_default()
 433            .acknowledge_message_id(message_id);
 434        cx.notify();
 435    }
 436
 437    pub fn update_latest_message_id(
 438        &mut self,
 439        channel_id: ChannelId,
 440        message_id: u64,
 441        cx: &mut Context<Self>,
 442    ) {
 443        self.channel_states
 444            .entry(channel_id)
 445            .or_default()
 446            .update_latest_message_id(message_id);
 447        cx.notify();
 448    }
 449
 450    pub fn acknowledge_notes_version(
 451        &mut self,
 452        channel_id: ChannelId,
 453        epoch: u64,
 454        version: &clock::Global,
 455        cx: &mut Context<Self>,
 456    ) {
 457        self.channel_states
 458            .entry(channel_id)
 459            .or_default()
 460            .acknowledge_notes_version(epoch, version);
 461        cx.notify()
 462    }
 463
 464    pub fn update_latest_notes_version(
 465        &mut self,
 466        channel_id: ChannelId,
 467        epoch: u64,
 468        version: &clock::Global,
 469        cx: &mut Context<Self>,
 470    ) {
 471        self.channel_states
 472            .entry(channel_id)
 473            .or_default()
 474            .update_latest_notes_version(epoch, version);
 475        cx.notify()
 476    }
 477
 478    pub fn open_channel_chat(
 479        &mut self,
 480        channel_id: ChannelId,
 481        cx: &mut Context<Self>,
 482    ) -> Task<Result<Entity<ChannelChat>>> {
 483        let client = self.client.clone();
 484        let user_store = self.user_store.clone();
 485        let this = cx.entity();
 486        self.open_channel_resource(
 487            channel_id,
 488            "chat",
 489            |this| &mut this.opened_chats,
 490            async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
 491            cx,
 492        )
 493    }
 494
 495    /// Asynchronously open a given resource associated with a channel.
 496    ///
 497    /// Make sure that the resource is only opened once, even if this method
 498    /// is called multiple times with the same channel id while the first task
 499    /// is still running.
 500    fn open_channel_resource<T, F>(
 501        &mut self,
 502        channel_id: ChannelId,
 503        resource_name: &'static str,
 504        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
 505        load: F,
 506        cx: &mut Context<Self>,
 507    ) -> Task<Result<Entity<T>>>
 508    where
 509        F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
 510        T: 'static,
 511    {
 512        let task = loop {
 513            match get_map(self).get(&channel_id) {
 514                Some(OpenEntityHandle::Open(entity)) => {
 515                    if let Some(entity) = entity.upgrade() {
 516                        break Task::ready(Ok(entity)).shared();
 517                    } else {
 518                        get_map(self).remove(&channel_id);
 519                        continue;
 520                    }
 521                }
 522                Some(OpenEntityHandle::Loading(task)) => break task.clone(),
 523                None => {}
 524            }
 525
 526            let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
 527            let task = cx
 528                .spawn(async move |this, cx| {
 529                    channels_ready.await?;
 530                    let channel = this.read_with(cx, |this, _| {
 531                        this.channel_for_id(channel_id)
 532                            .cloned()
 533                            .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
 534                    })??;
 535
 536                    load(channel, cx).await.map_err(Arc::new)
 537                })
 538                .shared();
 539
 540            get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
 541            let task = cx.spawn({
 542                async move |this, cx| {
 543                    let result = task.await;
 544                    this.update(cx, |this, _| match &result {
 545                        Ok(model) => {
 546                            get_map(this)
 547                                .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
 548                        }
 549                        Err(_) => {
 550                            get_map(this).remove(&channel_id);
 551                        }
 552                    })?;
 553                    result
 554                }
 555            });
 556            break task.shared();
 557        };
 558        cx.background_spawn(async move {
 559            task.await.map_err(|error| {
 560                anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
 561            })
 562        })
 563    }
 564
 565    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 566        self.channel_role(channel_id) == proto::ChannelRole::Admin
 567    }
 568
 569    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 570        self.channel_index
 571            .by_id()
 572            .get(&channel_id)
 573            .map_or(false, |channel| channel.is_root_channel())
 574    }
 575
 576    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 577        self.channel_index
 578            .by_id()
 579            .get(&channel_id)
 580            .map_or(false, |channel| {
 581                channel.visibility == ChannelVisibility::Public
 582            })
 583    }
 584
 585    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 586        match self.channel_role(channel_id) {
 587            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 588            _ => Capability::ReadOnly,
 589        }
 590    }
 591
 592    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 593        maybe!({
 594            let mut channel = self.channel_for_id(channel_id)?;
 595            if !channel.is_root_channel() {
 596                channel = self.channel_for_id(channel.root_id())?;
 597            }
 598            let root_channel_state = self.channel_states.get(&channel.id);
 599            root_channel_state?.role
 600        })
 601        .unwrap_or(proto::ChannelRole::Guest)
 602    }
 603
 604    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 605        self.channel_participants
 606            .get(&channel_id)
 607            .map_or(&[], |v| v.as_slice())
 608    }
 609
 610    pub fn create_channel(
 611        &self,
 612        name: &str,
 613        parent_id: Option<ChannelId>,
 614        cx: &mut Context<Self>,
 615    ) -> Task<Result<ChannelId>> {
 616        let client = self.client.clone();
 617        let name = name.trim_start_matches('#').to_owned();
 618        cx.spawn(async move |this, cx| {
 619            let response = client
 620                .request(proto::CreateChannel {
 621                    name,
 622                    parent_id: parent_id.map(|cid| cid.0),
 623                })
 624                .await?;
 625
 626            let channel = response.channel.context("missing channel in response")?;
 627            let channel_id = ChannelId(channel.id);
 628
 629            this.update(cx, |this, cx| {
 630                let task = this.update_channels(
 631                    proto::UpdateChannels {
 632                        channels: vec![channel],
 633                        ..Default::default()
 634                    },
 635                    cx,
 636                );
 637                assert!(task.is_none());
 638
 639                // This event is emitted because the collab panel wants to clear the pending edit state
 640                // before this frame is rendered. But we can't guarantee that the collab panel's future
 641                // will resolve before this flush_effects finishes. Synchronously emitting this event
 642                // ensures that the collab panel will observe this creation before the frame completes
 643                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 644            })?;
 645
 646            Ok(channel_id)
 647        })
 648    }
 649
 650    pub fn move_channel(
 651        &mut self,
 652        channel_id: ChannelId,
 653        to: ChannelId,
 654        cx: &mut Context<Self>,
 655    ) -> Task<Result<()>> {
 656        let client = self.client.clone();
 657        cx.spawn(async move |_, _| {
 658            let _ = client
 659                .request(proto::MoveChannel {
 660                    channel_id: channel_id.0,
 661                    to: to.0,
 662                })
 663                .await?;
 664            Ok(())
 665        })
 666    }
 667
 668    pub fn reorder_channel(
 669        &mut self,
 670        channel_id: ChannelId,
 671        direction: proto::reorder_channel::Direction,
 672        cx: &mut Context<Self>,
 673    ) -> Task<Result<()>> {
 674        let client = self.client.clone();
 675        cx.spawn(async move |_, _| {
 676            client
 677                .request(proto::ReorderChannel {
 678                    channel_id: channel_id.0,
 679                    direction: direction.into(),
 680                })
 681                .await?;
 682            Ok(())
 683        })
 684    }
 685
 686    pub fn set_channel_visibility(
 687        &mut self,
 688        channel_id: ChannelId,
 689        visibility: ChannelVisibility,
 690        cx: &mut Context<Self>,
 691    ) -> Task<Result<()>> {
 692        let client = self.client.clone();
 693        cx.spawn(async move |_, _| {
 694            let _ = client
 695                .request(proto::SetChannelVisibility {
 696                    channel_id: channel_id.0,
 697                    visibility: visibility.into(),
 698                })
 699                .await?;
 700
 701            Ok(())
 702        })
 703    }
 704
 705    pub fn invite_member(
 706        &mut self,
 707        channel_id: ChannelId,
 708        user_id: UserId,
 709        role: proto::ChannelRole,
 710        cx: &mut Context<Self>,
 711    ) -> Task<Result<()>> {
 712        if !self.outgoing_invites.insert((channel_id, user_id)) {
 713            return Task::ready(Err(anyhow!("invite request already in progress")));
 714        }
 715
 716        cx.notify();
 717        let client = self.client.clone();
 718        cx.spawn(async move |this, cx| {
 719            let result = client
 720                .request(proto::InviteChannelMember {
 721                    channel_id: channel_id.0,
 722                    user_id,
 723                    role: role.into(),
 724                })
 725                .await;
 726
 727            this.update(cx, |this, cx| {
 728                this.outgoing_invites.remove(&(channel_id, user_id));
 729                cx.notify();
 730            })?;
 731
 732            result?;
 733
 734            Ok(())
 735        })
 736    }
 737
 738    pub fn remove_member(
 739        &mut self,
 740        channel_id: ChannelId,
 741        user_id: u64,
 742        cx: &mut Context<Self>,
 743    ) -> Task<Result<()>> {
 744        if !self.outgoing_invites.insert((channel_id, user_id)) {
 745            return Task::ready(Err(anyhow!("invite request already in progress")));
 746        }
 747
 748        cx.notify();
 749        let client = self.client.clone();
 750        cx.spawn(async move |this, cx| {
 751            let result = client
 752                .request(proto::RemoveChannelMember {
 753                    channel_id: channel_id.0,
 754                    user_id,
 755                })
 756                .await;
 757
 758            this.update(cx, |this, cx| {
 759                this.outgoing_invites.remove(&(channel_id, user_id));
 760                cx.notify();
 761            })?;
 762            result?;
 763            Ok(())
 764        })
 765    }
 766
 767    pub fn set_member_role(
 768        &mut self,
 769        channel_id: ChannelId,
 770        user_id: UserId,
 771        role: proto::ChannelRole,
 772        cx: &mut Context<Self>,
 773    ) -> Task<Result<()>> {
 774        if !self.outgoing_invites.insert((channel_id, user_id)) {
 775            return Task::ready(Err(anyhow!("member request already in progress")));
 776        }
 777
 778        cx.notify();
 779        let client = self.client.clone();
 780        cx.spawn(async move |this, cx| {
 781            let result = client
 782                .request(proto::SetChannelMemberRole {
 783                    channel_id: channel_id.0,
 784                    user_id,
 785                    role: role.into(),
 786                })
 787                .await;
 788
 789            this.update(cx, |this, cx| {
 790                this.outgoing_invites.remove(&(channel_id, user_id));
 791                cx.notify();
 792            })?;
 793
 794            result?;
 795            Ok(())
 796        })
 797    }
 798
 799    pub fn rename(
 800        &mut self,
 801        channel_id: ChannelId,
 802        new_name: &str,
 803        cx: &mut Context<Self>,
 804    ) -> Task<Result<()>> {
 805        let client = self.client.clone();
 806        let name = new_name.to_string();
 807        cx.spawn(async move |this, cx| {
 808            let channel = client
 809                .request(proto::RenameChannel {
 810                    channel_id: channel_id.0,
 811                    name,
 812                })
 813                .await?
 814                .channel
 815                .context("missing channel in response")?;
 816            this.update(cx, |this, cx| {
 817                let task = this.update_channels(
 818                    proto::UpdateChannels {
 819                        channels: vec![channel],
 820                        ..Default::default()
 821                    },
 822                    cx,
 823                );
 824                assert!(task.is_none());
 825
 826                // This event is emitted because the collab panel wants to clear the pending edit state
 827                // before this frame is rendered. But we can't guarantee that the collab panel's future
 828                // will resolve before this flush_effects finishes. Synchronously emitting this event
 829                // ensures that the collab panel will observe this creation before the frame complete
 830                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 831            })?;
 832            Ok(())
 833        })
 834    }
 835
 836    pub fn respond_to_channel_invite(
 837        &mut self,
 838        channel_id: ChannelId,
 839        accept: bool,
 840        cx: &mut Context<Self>,
 841    ) -> Task<Result<()>> {
 842        let client = self.client.clone();
 843        cx.background_spawn(async move {
 844            client
 845                .request(proto::RespondToChannelInvite {
 846                    channel_id: channel_id.0,
 847                    accept,
 848                })
 849                .await?;
 850            Ok(())
 851        })
 852    }
 853    pub fn fuzzy_search_members(
 854        &self,
 855        channel_id: ChannelId,
 856        query: String,
 857        limit: u16,
 858        cx: &mut Context<Self>,
 859    ) -> Task<Result<Vec<ChannelMembership>>> {
 860        let client = self.client.clone();
 861        let user_store = self.user_store.downgrade();
 862        cx.spawn(async move |_, cx| {
 863            let response = client
 864                .request(proto::GetChannelMembers {
 865                    channel_id: channel_id.0,
 866                    query,
 867                    limit: limit as u64,
 868                })
 869                .await?;
 870            user_store.update(cx, |user_store, _| {
 871                user_store.insert(response.users);
 872                response
 873                    .members
 874                    .into_iter()
 875                    .filter_map(|member| {
 876                        Some(ChannelMembership {
 877                            user: user_store.get_cached_user(member.user_id)?,
 878                            role: member.role(),
 879                            kind: member.kind(),
 880                        })
 881                    })
 882                    .collect()
 883            })
 884        })
 885    }
 886
 887    pub fn remove_channel(
 888        &self,
 889        channel_id: ChannelId,
 890    ) -> impl Future<Output = Result<()>> + use<> {
 891        let client = self.client.clone();
 892        async move {
 893            client
 894                .request(proto::DeleteChannel {
 895                    channel_id: channel_id.0,
 896                })
 897                .await?;
 898            Ok(())
 899        }
 900    }
 901
 902    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 903        false
 904    }
 905
 906    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 907        self.outgoing_invites.contains(&(channel_id, user_id))
 908    }
 909
 910    async fn handle_update_channels(
 911        this: Entity<Self>,
 912        message: TypedEnvelope<proto::UpdateChannels>,
 913        mut cx: AsyncApp,
 914    ) -> Result<()> {
 915        this.read_with(&mut cx, |this, _| {
 916            this.update_channels_tx
 917                .unbounded_send(message.payload)
 918                .unwrap();
 919        })?;
 920        Ok(())
 921    }
 922
 923    async fn handle_update_user_channels(
 924        this: Entity<Self>,
 925        message: TypedEnvelope<proto::UpdateUserChannels>,
 926        mut cx: AsyncApp,
 927    ) -> Result<()> {
 928        this.update(&mut cx, |this, cx| {
 929            for buffer_version in message.payload.observed_channel_buffer_version {
 930                let version = language::proto::deserialize_version(&buffer_version.version);
 931                this.acknowledge_notes_version(
 932                    ChannelId(buffer_version.channel_id),
 933                    buffer_version.epoch,
 934                    &version,
 935                    cx,
 936                );
 937            }
 938            for message_id in message.payload.observed_channel_message_id {
 939                this.acknowledge_message_id(
 940                    ChannelId(message_id.channel_id),
 941                    message_id.message_id,
 942                    cx,
 943                );
 944            }
 945            for membership in message.payload.channel_memberships {
 946                if let Some(role) = ChannelRole::from_i32(membership.role) {
 947                    this.channel_states
 948                        .entry(ChannelId(membership.channel_id))
 949                        .or_default()
 950                        .set_role(role)
 951                }
 952            }
 953        })
 954    }
 955
 956    fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 957        self.channel_index.clear();
 958        self.channel_invitations.clear();
 959        self.channel_participants.clear();
 960        self.channel_index.clear();
 961        self.outgoing_invites.clear();
 962        self.disconnect_channel_buffers_task.take();
 963
 964        for chat in self.opened_chats.values() {
 965            if let OpenEntityHandle::Open(chat) = chat {
 966                if let Some(chat) = chat.upgrade() {
 967                    chat.update(cx, |chat, cx| {
 968                        chat.rejoin(cx);
 969                    });
 970                }
 971            }
 972        }
 973
 974        let mut buffer_versions = Vec::new();
 975        for buffer in self.opened_buffers.values() {
 976            if let OpenEntityHandle::Open(buffer) = buffer {
 977                if let Some(buffer) = buffer.upgrade() {
 978                    let channel_buffer = buffer.read(cx);
 979                    let buffer = channel_buffer.buffer().read(cx);
 980                    buffer_versions.push(proto::ChannelBufferVersion {
 981                        channel_id: channel_buffer.channel_id.0,
 982                        epoch: channel_buffer.epoch(),
 983                        version: language::proto::serialize_version(&buffer.version()),
 984                    });
 985                }
 986            }
 987        }
 988
 989        if buffer_versions.is_empty() {
 990            return Task::ready(Ok(()));
 991        }
 992
 993        let response = self.client.request(proto::RejoinChannelBuffers {
 994            buffers: buffer_versions,
 995        });
 996
 997        cx.spawn(async move |this, cx| {
 998            let mut response = response.await?;
 999
1000            this.update(cx, |this, cx| {
1001                this.opened_buffers.retain(|_, buffer| match buffer {
1002                    OpenEntityHandle::Open(channel_buffer) => {
1003                        let Some(channel_buffer) = channel_buffer.upgrade() else {
1004                            return false;
1005                        };
1006
1007                        channel_buffer.update(cx, |channel_buffer, cx| {
1008                            let channel_id = channel_buffer.channel_id;
1009                            if let Some(remote_buffer) = response
1010                                .buffers
1011                                .iter_mut()
1012                                .find(|buffer| buffer.channel_id == channel_id.0)
1013                            {
1014                                let channel_id = channel_buffer.channel_id;
1015                                let remote_version =
1016                                    language::proto::deserialize_version(&remote_buffer.version);
1017
1018                                channel_buffer.replace_collaborators(
1019                                    mem::take(&mut remote_buffer.collaborators),
1020                                    cx,
1021                                );
1022
1023                                let operations = channel_buffer
1024                                    .buffer()
1025                                    .update(cx, |buffer, cx| {
1026                                        let outgoing_operations =
1027                                            buffer.serialize_ops(Some(remote_version), cx);
1028                                        let incoming_operations =
1029                                            mem::take(&mut remote_buffer.operations)
1030                                                .into_iter()
1031                                                .map(language::proto::deserialize_operation)
1032                                                .collect::<Result<Vec<_>>>()?;
1033                                        buffer.apply_ops(incoming_operations, cx);
1034                                        anyhow::Ok(outgoing_operations)
1035                                    })
1036                                    .log_err();
1037
1038                                if let Some(operations) = operations {
1039                                    channel_buffer.connected(cx);
1040                                    let client = this.client.clone();
1041                                    cx.background_spawn(async move {
1042                                        let operations = operations.await;
1043                                        for chunk in language::proto::split_operations(operations) {
1044                                            client
1045                                                .send(proto::UpdateChannelBuffer {
1046                                                    channel_id: channel_id.0,
1047                                                    operations: chunk,
1048                                                })
1049                                                .ok();
1050                                        }
1051                                    })
1052                                    .detach();
1053                                    return true;
1054                                }
1055                            }
1056
1057                            channel_buffer.disconnect(cx);
1058                            false
1059                        })
1060                    }
1061                    OpenEntityHandle::Loading(_) => true,
1062                });
1063            })
1064            .ok();
1065            anyhow::Ok(())
1066        })
1067    }
1068
1069    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1070        cx.notify();
1071        self.did_subscribe = false;
1072        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1073            cx.spawn(async move |this, cx| {
1074                if wait_for_reconnect {
1075                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1076                }
1077
1078                if let Some(this) = this.upgrade() {
1079                    this.update(cx, |this, cx| {
1080                        for (_, buffer) in &this.opened_buffers {
1081                            if let OpenEntityHandle::Open(buffer) = &buffer {
1082                                if let Some(buffer) = buffer.upgrade() {
1083                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1084                                }
1085                            }
1086                        }
1087                    })
1088                    .ok();
1089                }
1090            })
1091        });
1092    }
1093
1094    #[cfg(any(test, feature = "test-support"))]
1095    pub fn reset(&mut self) {
1096        self.channel_invitations.clear();
1097        self.channel_index.clear();
1098        self.channel_participants.clear();
1099        self.outgoing_invites.clear();
1100        self.opened_buffers.clear();
1101        self.opened_chats.clear();
1102        self.disconnect_channel_buffers_task = None;
1103        self.channel_states.clear();
1104    }
1105
1106    pub(crate) fn update_channels(
1107        &mut self,
1108        payload: proto::UpdateChannels,
1109        cx: &mut Context<ChannelStore>,
1110    ) -> Option<Task<Result<()>>> {
1111        if !payload.remove_channel_invitations.is_empty() {
1112            self.channel_invitations
1113                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1114        }
1115        for channel in payload.channel_invitations {
1116            match self
1117                .channel_invitations
1118                .binary_search_by_key(&channel.id, |c| c.id.0)
1119            {
1120                Ok(ix) => {
1121                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1122                }
1123                Err(ix) => self.channel_invitations.insert(
1124                    ix,
1125                    Arc::new(Channel {
1126                        id: ChannelId(channel.id),
1127                        visibility: channel.visibility(),
1128                        name: channel.name.into(),
1129                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1130                        channel_order: channel.channel_order,
1131                    }),
1132                ),
1133            }
1134        }
1135
1136        let channels_changed = !payload.channels.is_empty()
1137            || !payload.delete_channels.is_empty()
1138            || !payload.latest_channel_message_ids.is_empty()
1139            || !payload.latest_channel_buffer_versions.is_empty();
1140
1141        if channels_changed {
1142            if !payload.delete_channels.is_empty() {
1143                let delete_channels: Vec<ChannelId> =
1144                    payload.delete_channels.into_iter().map(ChannelId).collect();
1145                self.channel_index.delete_channels(&delete_channels);
1146                self.channel_participants
1147                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1148
1149                for channel_id in &delete_channels {
1150                    let channel_id = *channel_id;
1151                    if payload
1152                        .channels
1153                        .iter()
1154                        .any(|channel| channel.id == channel_id.0)
1155                    {
1156                        continue;
1157                    }
1158                    if let Some(OpenEntityHandle::Open(buffer)) =
1159                        self.opened_buffers.remove(&channel_id)
1160                    {
1161                        if let Some(buffer) = buffer.upgrade() {
1162                            buffer.update(cx, ChannelBuffer::disconnect);
1163                        }
1164                    }
1165                }
1166            }
1167
1168            let mut index = self.channel_index.bulk_insert();
1169            for channel in payload.channels {
1170                let id = ChannelId(channel.id);
1171                let channel_changed = index.insert(channel);
1172
1173                if channel_changed {
1174                    if let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1175                        if let Some(buffer) = buffer.upgrade() {
1176                            buffer.update(cx, ChannelBuffer::channel_changed);
1177                        }
1178                    }
1179                }
1180            }
1181
1182            for latest_buffer_version in payload.latest_channel_buffer_versions {
1183                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1184                self.channel_states
1185                    .entry(ChannelId(latest_buffer_version.channel_id))
1186                    .or_default()
1187                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1188            }
1189
1190            for latest_channel_message in payload.latest_channel_message_ids {
1191                self.channel_states
1192                    .entry(ChannelId(latest_channel_message.channel_id))
1193                    .or_default()
1194                    .update_latest_message_id(latest_channel_message.message_id);
1195            }
1196
1197            self.channels_loaded.0.try_send(true).log_err();
1198        }
1199
1200        cx.notify();
1201        if payload.channel_participants.is_empty() {
1202            return None;
1203        }
1204
1205        let mut all_user_ids = Vec::new();
1206        let channel_participants = payload.channel_participants;
1207        for entry in &channel_participants {
1208            for user_id in entry.participant_user_ids.iter() {
1209                if let Err(ix) = all_user_ids.binary_search(user_id) {
1210                    all_user_ids.insert(ix, *user_id);
1211                }
1212            }
1213        }
1214
1215        let users = self
1216            .user_store
1217            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1218        Some(cx.spawn(async move |this, cx| {
1219            let users = users.await?;
1220
1221            this.update(cx, |this, cx| {
1222                for entry in &channel_participants {
1223                    let mut participants: Vec<_> = entry
1224                        .participant_user_ids
1225                        .iter()
1226                        .filter_map(|user_id| {
1227                            users
1228                                .binary_search_by_key(&user_id, |user| &user.id)
1229                                .ok()
1230                                .map(|ix| users[ix].clone())
1231                        })
1232                        .collect();
1233
1234                    participants.sort_by_key(|u| u.id);
1235
1236                    this.channel_participants
1237                        .insert(ChannelId(entry.channel_id), participants);
1238                }
1239
1240                cx.notify();
1241            })
1242        }))
1243    }
1244}
1245
1246impl ChannelState {
1247    fn set_role(&mut self, role: ChannelRole) {
1248        self.role = Some(role);
1249    }
1250
1251    fn has_channel_buffer_changed(&self) -> bool {
1252        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1253            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1254                && self
1255                    .latest_notes_version
1256                    .version
1257                    .changed_since(&self.observed_notes_version.version))
1258    }
1259
1260    fn has_new_messages(&self) -> bool {
1261        let latest_message_id = self.latest_chat_message;
1262        let observed_message_id = self.observed_chat_message;
1263
1264        latest_message_id.is_some_and(|latest_message_id| {
1265            latest_message_id > observed_message_id.unwrap_or_default()
1266        })
1267    }
1268
1269    fn last_acknowledged_message_id(&self) -> Option<u64> {
1270        self.observed_chat_message
1271    }
1272
1273    fn acknowledge_message_id(&mut self, message_id: u64) {
1274        let observed = self.observed_chat_message.get_or_insert(message_id);
1275        *observed = (*observed).max(message_id);
1276    }
1277
1278    fn update_latest_message_id(&mut self, message_id: u64) {
1279        self.latest_chat_message =
1280            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1281    }
1282
1283    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1284        if self.observed_notes_version.epoch == epoch {
1285            self.observed_notes_version.version.join(version);
1286        } else {
1287            self.observed_notes_version = NotesVersion {
1288                epoch,
1289                version: version.clone(),
1290            };
1291        }
1292    }
1293
1294    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1295        if self.latest_notes_version.epoch == epoch {
1296            self.latest_notes_version.version.join(version);
1297        } else {
1298            self.latest_notes_version = NotesVersion {
1299                epoch,
1300                version: version.clone(),
1301            };
1302        }
1303    }
1304}