channel_store.rs

   1mod channel_index;
   2
   3use crate::{ChannelMessage, channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
   4use anyhow::{Context as _, Result, anyhow};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
   7use collections::{HashMap, HashSet};
   8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
   9use gpui::{
  10    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
  11    WeakEntity,
  12};
  13use language::Capability;
  14use postage::{sink::Sink, watch};
  15use rpc::{
  16    TypedEnvelope,
  17    proto::{self, ChannelRole, ChannelVisibility},
  18};
  19use settings::Settings;
  20use std::{mem, sync::Arc, time::Duration};
  21use util::{ResultExt, maybe};
  22
  23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  24
  25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
  26    let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36pub struct ChannelStore {
  37    pub channel_index: ChannelIndex,
  38    channel_invitations: Vec<Arc<Channel>>,
  39    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  40    channel_states: HashMap<ChannelId, ChannelState>,
  41    outgoing_invites: HashSet<(ChannelId, UserId)>,
  42    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  43    opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
  44    opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
  45    client: Arc<Client>,
  46    did_subscribe: bool,
  47    channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
  48    user_store: Entity<UserStore>,
  49    _rpc_subscriptions: [Subscription; 2],
  50    _watch_connection_status: Task<Option<()>>,
  51    disconnect_channel_buffers_task: Option<Task<()>>,
  52    _update_channels: Task<()>,
  53}
  54
  55#[derive(Clone, Debug)]
  56pub struct Channel {
  57    pub id: ChannelId,
  58    pub name: SharedString,
  59    pub visibility: proto::ChannelVisibility,
  60    pub parent_path: Vec<ChannelId>,
  61    pub channel_order: i32,
  62}
  63
  64#[derive(Default, Debug)]
  65pub struct ChannelState {
  66    latest_chat_message: Option<u64>,
  67    latest_notes_version: NotesVersion,
  68    observed_notes_version: NotesVersion,
  69    observed_chat_message: Option<u64>,
  70    role: Option<ChannelRole>,
  71}
  72
  73impl Channel {
  74    pub fn link(&self, cx: &App) -> String {
  75        format!(
  76            "{}/channel/{}-{}",
  77            ClientSettings::get_global(cx).server_url,
  78            Self::slug(&self.name),
  79            self.id
  80        )
  81    }
  82
  83    pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
  84        self.link(cx)
  85            + "/notes"
  86            + &heading
  87                .map(|h| format!("#{}", Self::slug(&h)))
  88                .unwrap_or_default()
  89    }
  90
  91    pub fn is_root_channel(&self) -> bool {
  92        self.parent_path.is_empty()
  93    }
  94
  95    pub fn root_id(&self) -> ChannelId {
  96        self.parent_path.first().copied().unwrap_or(self.id)
  97    }
  98
  99    pub fn slug(str: &str) -> String {
 100        let slug: String = str
 101            .chars()
 102            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 103            .collect();
 104
 105        slug.trim_matches(|c| c == '-').to_string()
 106    }
 107}
 108
 109#[derive(Debug)]
 110pub struct ChannelMembership {
 111    pub user: Arc<User>,
 112    pub kind: proto::channel_member::Kind,
 113    pub role: proto::ChannelRole,
 114}
 115impl ChannelMembership {
 116    pub fn sort_key(&self) -> MembershipSortKey<'_> {
 117        MembershipSortKey {
 118            role_order: match self.role {
 119                proto::ChannelRole::Admin => 0,
 120                proto::ChannelRole::Member => 1,
 121                proto::ChannelRole::Banned => 2,
 122                proto::ChannelRole::Talker => 3,
 123                proto::ChannelRole::Guest => 4,
 124            },
 125            kind_order: match self.kind {
 126                proto::channel_member::Kind::Member => 0,
 127                proto::channel_member::Kind::Invitee => 1,
 128            },
 129            username_order: &self.user.github_login,
 130        }
 131    }
 132}
 133
 134#[derive(PartialOrd, Ord, PartialEq, Eq)]
 135pub struct MembershipSortKey<'a> {
 136    role_order: u8,
 137    kind_order: u8,
 138    username_order: &'a str,
 139}
 140
 141pub enum ChannelEvent {
 142    ChannelCreated(ChannelId),
 143    ChannelRenamed(ChannelId),
 144}
 145
 146impl EventEmitter<ChannelEvent> for ChannelStore {}
 147
 148enum OpenEntityHandle<E> {
 149    Open(WeakEntity<E>),
 150    Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
 151}
 152
 153struct GlobalChannelStore(Entity<ChannelStore>);
 154
 155impl Global for GlobalChannelStore {}
 156
 157impl ChannelStore {
 158    pub fn global(cx: &App) -> Entity<Self> {
 159        cx.global::<GlobalChannelStore>().0.clone()
 160    }
 161
 162    pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 163        let rpc_subscriptions = [
 164            client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
 165            client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
 166        ];
 167
 168        let mut connection_status = client.status();
 169        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 170        let watch_connection_status = cx.spawn(async move |this, cx| {
 171            while let Some(status) = connection_status.next().await {
 172                let this = this.upgrade()?;
 173                match status {
 174                    client::Status::Connected { .. } => {
 175                        this.update(cx, |this, cx| this.handle_connect(cx))
 176                            .ok()?
 177                            .await
 178                            .log_err()?;
 179                    }
 180                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 181                        this.update(cx, |this, cx| this.handle_disconnect(false, cx))
 182                            .ok();
 183                    }
 184                    _ => {
 185                        this.update(cx, |this, cx| this.handle_disconnect(true, cx))
 186                            .ok();
 187                    }
 188                }
 189            }
 190            Some(())
 191        });
 192
 193        Self {
 194            channel_invitations: Vec::default(),
 195            channel_index: ChannelIndex::default(),
 196            channel_participants: Default::default(),
 197            outgoing_invites: Default::default(),
 198            opened_buffers: Default::default(),
 199            opened_chats: Default::default(),
 200            update_channels_tx,
 201            client,
 202            user_store,
 203            _rpc_subscriptions: rpc_subscriptions,
 204            _watch_connection_status: watch_connection_status,
 205            disconnect_channel_buffers_task: None,
 206            _update_channels: cx.spawn(async move |this, cx| {
 207                maybe!(async move {
 208                    while let Some(update_channels) = update_channels_rx.next().await {
 209                        if let Some(this) = this.upgrade() {
 210                            let update_task = this
 211                                .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
 212                            if let Some(update_task) = update_task {
 213                                update_task.await.log_err();
 214                            }
 215                        }
 216                    }
 217                    anyhow::Ok(())
 218                })
 219                .await
 220                .log_err();
 221            }),
 222            channel_states: Default::default(),
 223            did_subscribe: false,
 224            channels_loaded: watch::channel_with(false),
 225        }
 226    }
 227
 228    pub fn initialize(&mut self) {
 229        if !self.did_subscribe
 230            && self
 231                .client
 232                .send(proto::SubscribeToChannels {})
 233                .log_err()
 234                .is_some()
 235        {
 236            self.did_subscribe = true;
 237        }
 238    }
 239
 240    pub fn wait_for_channels(
 241        &mut self,
 242        timeout: Duration,
 243        cx: &mut Context<Self>,
 244    ) -> Task<Result<()>> {
 245        let mut channels_loaded_rx = self.channels_loaded.1.clone();
 246        if *channels_loaded_rx.borrow() {
 247            return Task::ready(Ok(()));
 248        }
 249
 250        let mut status_receiver = self.client.status();
 251        if status_receiver.borrow().is_connected() {
 252            self.initialize();
 253        }
 254
 255        let mut timer = cx.background_executor().timer(timeout).fuse();
 256        cx.spawn(async move |this, cx| {
 257            loop {
 258                futures::select_biased! {
 259                    channels_loaded = channels_loaded_rx.next().fuse() => {
 260                        if let Some(true) = channels_loaded {
 261                            return Ok(());
 262                        }
 263                    }
 264                    status = status_receiver.next().fuse() => {
 265                        if let Some(status) = status
 266                            && status.is_connected() {
 267                                this.update(cx, |this, _cx| {
 268                                    this.initialize();
 269                                }).ok();
 270                            }
 271                        continue;
 272                    }
 273                    _ = timer => {
 274                        return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
 275                    }
 276                }
 277            }
 278        })
 279    }
 280
 281    pub fn client(&self) -> Arc<Client> {
 282        self.client.clone()
 283    }
 284
 285    /// Returns the number of unique channels in the store
 286    pub fn channel_count(&self) -> usize {
 287        self.channel_index.by_id().len()
 288    }
 289
 290    /// Returns the index of a channel ID in the list of unique channels
 291    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 292        self.channel_index
 293            .by_id()
 294            .keys()
 295            .position(|id| *id == channel_id)
 296    }
 297
 298    /// Returns an iterator over all unique channels
 299    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 300        self.channel_index.by_id().values()
 301    }
 302
 303    /// Iterate over all entries in the channel DAG
 304    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 305        self.channel_index
 306            .ordered_channels()
 307            .iter()
 308            .filter_map(move |id| {
 309                let channel = self.channel_index.by_id().get(id)?;
 310                Some((channel.parent_path.len(), channel))
 311            })
 312    }
 313
 314    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 315        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 316        self.channel_index.by_id().get(channel_id)
 317    }
 318
 319    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 320        self.channel_index.by_id().values().nth(ix)
 321    }
 322
 323    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 324        self.channel_invitations
 325            .iter()
 326            .any(|channel| channel.id == channel_id)
 327    }
 328
 329    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 330        &self.channel_invitations
 331    }
 332
 333    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 334        self.channel_index.by_id().get(&channel_id)
 335    }
 336
 337    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
 338        if let Some(buffer) = self.opened_buffers.get(&channel_id)
 339            && let OpenEntityHandle::Open(buffer) = buffer
 340        {
 341            return buffer.upgrade().is_some();
 342        }
 343        false
 344    }
 345
 346    pub fn open_channel_buffer(
 347        &mut self,
 348        channel_id: ChannelId,
 349        cx: &mut Context<Self>,
 350    ) -> Task<Result<Entity<ChannelBuffer>>> {
 351        let client = self.client.clone();
 352        let user_store = self.user_store.clone();
 353        let channel_store = cx.entity();
 354        self.open_channel_resource(
 355            channel_id,
 356            "notes",
 357            |this| &mut this.opened_buffers,
 358            async move |channel, cx| {
 359                ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
 360            },
 361            cx,
 362        )
 363    }
 364
 365    pub fn fetch_channel_messages(
 366        &self,
 367        message_ids: Vec<u64>,
 368        cx: &mut Context<Self>,
 369    ) -> Task<Result<Vec<ChannelMessage>>> {
 370        let request = if message_ids.is_empty() {
 371            None
 372        } else {
 373            Some(
 374                self.client
 375                    .request(proto::GetChannelMessagesById { message_ids }),
 376            )
 377        };
 378        cx.spawn(async move |this, cx| {
 379            if let Some(request) = request {
 380                let response = request.await?;
 381                let this = this.upgrade().context("channel store dropped")?;
 382                let user_store = this.read_with(cx, |this, _| this.user_store.clone())?;
 383                ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
 384            } else {
 385                Ok(Vec::new())
 386            }
 387        })
 388    }
 389
 390    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 391        self.channel_states
 392            .get(&channel_id)
 393            .is_some_and(|state| state.has_channel_buffer_changed())
 394    }
 395
 396    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 397        self.channel_states
 398            .get(&channel_id)
 399            .is_some_and(|state| state.has_new_messages())
 400    }
 401
 402    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 403        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 404            state.latest_chat_message = message_id;
 405        }
 406    }
 407
 408    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 409        self.channel_states.get(&channel_id).and_then(|state| {
 410            if let Some(last_message_id) = state.latest_chat_message
 411                && state
 412                    .last_acknowledged_message_id()
 413                    .is_some_and(|id| id < last_message_id)
 414            {
 415                return state.last_acknowledged_message_id();
 416            }
 417
 418            None
 419        })
 420    }
 421
 422    pub fn acknowledge_message_id(
 423        &mut self,
 424        channel_id: ChannelId,
 425        message_id: u64,
 426        cx: &mut Context<Self>,
 427    ) {
 428        self.channel_states
 429            .entry(channel_id)
 430            .or_default()
 431            .acknowledge_message_id(message_id);
 432        cx.notify();
 433    }
 434
 435    pub fn update_latest_message_id(
 436        &mut self,
 437        channel_id: ChannelId,
 438        message_id: u64,
 439        cx: &mut Context<Self>,
 440    ) {
 441        self.channel_states
 442            .entry(channel_id)
 443            .or_default()
 444            .update_latest_message_id(message_id);
 445        cx.notify();
 446    }
 447
 448    pub fn acknowledge_notes_version(
 449        &mut self,
 450        channel_id: ChannelId,
 451        epoch: u64,
 452        version: &clock::Global,
 453        cx: &mut Context<Self>,
 454    ) {
 455        self.channel_states
 456            .entry(channel_id)
 457            .or_default()
 458            .acknowledge_notes_version(epoch, version);
 459        cx.notify()
 460    }
 461
 462    pub fn update_latest_notes_version(
 463        &mut self,
 464        channel_id: ChannelId,
 465        epoch: u64,
 466        version: &clock::Global,
 467        cx: &mut Context<Self>,
 468    ) {
 469        self.channel_states
 470            .entry(channel_id)
 471            .or_default()
 472            .update_latest_notes_version(epoch, version);
 473        cx.notify()
 474    }
 475
 476    pub fn open_channel_chat(
 477        &mut self,
 478        channel_id: ChannelId,
 479        cx: &mut Context<Self>,
 480    ) -> Task<Result<Entity<ChannelChat>>> {
 481        let client = self.client.clone();
 482        let user_store = self.user_store.clone();
 483        let this = cx.entity();
 484        self.open_channel_resource(
 485            channel_id,
 486            "chat",
 487            |this| &mut this.opened_chats,
 488            async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
 489            cx,
 490        )
 491    }
 492
 493    /// Asynchronously open a given resource associated with a channel.
 494    ///
 495    /// Make sure that the resource is only opened once, even if this method
 496    /// is called multiple times with the same channel id while the first task
 497    /// is still running.
 498    fn open_channel_resource<T, F>(
 499        &mut self,
 500        channel_id: ChannelId,
 501        resource_name: &'static str,
 502        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
 503        load: F,
 504        cx: &mut Context<Self>,
 505    ) -> Task<Result<Entity<T>>>
 506    where
 507        F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
 508        T: 'static,
 509    {
 510        let task = loop {
 511            match get_map(self).get(&channel_id) {
 512                Some(OpenEntityHandle::Open(entity)) => {
 513                    if let Some(entity) = entity.upgrade() {
 514                        break Task::ready(Ok(entity)).shared();
 515                    } else {
 516                        get_map(self).remove(&channel_id);
 517                        continue;
 518                    }
 519                }
 520                Some(OpenEntityHandle::Loading(task)) => break task.clone(),
 521                None => {}
 522            }
 523
 524            let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
 525            let task = cx
 526                .spawn(async move |this, cx| {
 527                    channels_ready.await?;
 528                    let channel = this.read_with(cx, |this, _| {
 529                        this.channel_for_id(channel_id)
 530                            .cloned()
 531                            .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
 532                    })??;
 533
 534                    load(channel, cx).await.map_err(Arc::new)
 535                })
 536                .shared();
 537
 538            get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
 539            let task = cx.spawn({
 540                async move |this, cx| {
 541                    let result = task.await;
 542                    this.update(cx, |this, _| match &result {
 543                        Ok(model) => {
 544                            get_map(this)
 545                                .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
 546                        }
 547                        Err(_) => {
 548                            get_map(this).remove(&channel_id);
 549                        }
 550                    })?;
 551                    result
 552                }
 553            });
 554            break task.shared();
 555        };
 556        cx.background_spawn(async move {
 557            task.await.map_err(|error| {
 558                anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
 559            })
 560        })
 561    }
 562
 563    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 564        self.channel_role(channel_id) == proto::ChannelRole::Admin
 565    }
 566
 567    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 568        self.channel_index
 569            .by_id()
 570            .get(&channel_id)
 571            .is_some_and(|channel| channel.is_root_channel())
 572    }
 573
 574    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 575        self.channel_index
 576            .by_id()
 577            .get(&channel_id)
 578            .is_some_and(|channel| channel.visibility == ChannelVisibility::Public)
 579    }
 580
 581    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 582        match self.channel_role(channel_id) {
 583            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 584            _ => Capability::ReadOnly,
 585        }
 586    }
 587
 588    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 589        maybe!({
 590            let mut channel = self.channel_for_id(channel_id)?;
 591            if !channel.is_root_channel() {
 592                channel = self.channel_for_id(channel.root_id())?;
 593            }
 594            let root_channel_state = self.channel_states.get(&channel.id);
 595            root_channel_state?.role
 596        })
 597        .unwrap_or(proto::ChannelRole::Guest)
 598    }
 599
 600    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 601        self.channel_participants
 602            .get(&channel_id)
 603            .map_or(&[], |v| v.as_slice())
 604    }
 605
 606    pub fn create_channel(
 607        &self,
 608        name: &str,
 609        parent_id: Option<ChannelId>,
 610        cx: &mut Context<Self>,
 611    ) -> Task<Result<ChannelId>> {
 612        let client = self.client.clone();
 613        let name = name.trim_start_matches('#').to_owned();
 614        cx.spawn(async move |this, cx| {
 615            let response = client
 616                .request(proto::CreateChannel {
 617                    name,
 618                    parent_id: parent_id.map(|cid| cid.0),
 619                })
 620                .await?;
 621
 622            let channel = response.channel.context("missing channel in response")?;
 623            let channel_id = ChannelId(channel.id);
 624
 625            this.update(cx, |this, cx| {
 626                let task = this.update_channels(
 627                    proto::UpdateChannels {
 628                        channels: vec![channel],
 629                        ..Default::default()
 630                    },
 631                    cx,
 632                );
 633                assert!(task.is_none());
 634
 635                // This event is emitted because the collab panel wants to clear the pending edit state
 636                // before this frame is rendered. But we can't guarantee that the collab panel's future
 637                // will resolve before this flush_effects finishes. Synchronously emitting this event
 638                // ensures that the collab panel will observe this creation before the frame completes
 639                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 640            })?;
 641
 642            Ok(channel_id)
 643        })
 644    }
 645
 646    pub fn move_channel(
 647        &mut self,
 648        channel_id: ChannelId,
 649        to: ChannelId,
 650        cx: &mut Context<Self>,
 651    ) -> Task<Result<()>> {
 652        let client = self.client.clone();
 653        cx.spawn(async move |_, _| {
 654            let _ = client
 655                .request(proto::MoveChannel {
 656                    channel_id: channel_id.0,
 657                    to: to.0,
 658                })
 659                .await?;
 660            Ok(())
 661        })
 662    }
 663
 664    pub fn reorder_channel(
 665        &mut self,
 666        channel_id: ChannelId,
 667        direction: proto::reorder_channel::Direction,
 668        cx: &mut Context<Self>,
 669    ) -> Task<Result<()>> {
 670        let client = self.client.clone();
 671        cx.spawn(async move |_, _| {
 672            client
 673                .request(proto::ReorderChannel {
 674                    channel_id: channel_id.0,
 675                    direction: direction.into(),
 676                })
 677                .await?;
 678            Ok(())
 679        })
 680    }
 681
 682    pub fn set_channel_visibility(
 683        &mut self,
 684        channel_id: ChannelId,
 685        visibility: ChannelVisibility,
 686        cx: &mut Context<Self>,
 687    ) -> Task<Result<()>> {
 688        let client = self.client.clone();
 689        cx.spawn(async move |_, _| {
 690            let _ = client
 691                .request(proto::SetChannelVisibility {
 692                    channel_id: channel_id.0,
 693                    visibility: visibility.into(),
 694                })
 695                .await?;
 696
 697            Ok(())
 698        })
 699    }
 700
 701    pub fn invite_member(
 702        &mut self,
 703        channel_id: ChannelId,
 704        user_id: UserId,
 705        role: proto::ChannelRole,
 706        cx: &mut Context<Self>,
 707    ) -> Task<Result<()>> {
 708        if !self.outgoing_invites.insert((channel_id, user_id)) {
 709            return Task::ready(Err(anyhow!("invite request already in progress")));
 710        }
 711
 712        cx.notify();
 713        let client = self.client.clone();
 714        cx.spawn(async move |this, cx| {
 715            let result = client
 716                .request(proto::InviteChannelMember {
 717                    channel_id: channel_id.0,
 718                    user_id,
 719                    role: role.into(),
 720                })
 721                .await;
 722
 723            this.update(cx, |this, cx| {
 724                this.outgoing_invites.remove(&(channel_id, user_id));
 725                cx.notify();
 726            })?;
 727
 728            result?;
 729
 730            Ok(())
 731        })
 732    }
 733
 734    pub fn remove_member(
 735        &mut self,
 736        channel_id: ChannelId,
 737        user_id: u64,
 738        cx: &mut Context<Self>,
 739    ) -> Task<Result<()>> {
 740        if !self.outgoing_invites.insert((channel_id, user_id)) {
 741            return Task::ready(Err(anyhow!("invite request already in progress")));
 742        }
 743
 744        cx.notify();
 745        let client = self.client.clone();
 746        cx.spawn(async move |this, cx| {
 747            let result = client
 748                .request(proto::RemoveChannelMember {
 749                    channel_id: channel_id.0,
 750                    user_id,
 751                })
 752                .await;
 753
 754            this.update(cx, |this, cx| {
 755                this.outgoing_invites.remove(&(channel_id, user_id));
 756                cx.notify();
 757            })?;
 758            result?;
 759            Ok(())
 760        })
 761    }
 762
 763    pub fn set_member_role(
 764        &mut self,
 765        channel_id: ChannelId,
 766        user_id: UserId,
 767        role: proto::ChannelRole,
 768        cx: &mut Context<Self>,
 769    ) -> Task<Result<()>> {
 770        if !self.outgoing_invites.insert((channel_id, user_id)) {
 771            return Task::ready(Err(anyhow!("member request already in progress")));
 772        }
 773
 774        cx.notify();
 775        let client = self.client.clone();
 776        cx.spawn(async move |this, cx| {
 777            let result = client
 778                .request(proto::SetChannelMemberRole {
 779                    channel_id: channel_id.0,
 780                    user_id,
 781                    role: role.into(),
 782                })
 783                .await;
 784
 785            this.update(cx, |this, cx| {
 786                this.outgoing_invites.remove(&(channel_id, user_id));
 787                cx.notify();
 788            })?;
 789
 790            result?;
 791            Ok(())
 792        })
 793    }
 794
 795    pub fn rename(
 796        &mut self,
 797        channel_id: ChannelId,
 798        new_name: &str,
 799        cx: &mut Context<Self>,
 800    ) -> Task<Result<()>> {
 801        let client = self.client.clone();
 802        let name = new_name.to_string();
 803        cx.spawn(async move |this, cx| {
 804            let channel = client
 805                .request(proto::RenameChannel {
 806                    channel_id: channel_id.0,
 807                    name,
 808                })
 809                .await?
 810                .channel
 811                .context("missing channel in response")?;
 812            this.update(cx, |this, cx| {
 813                let task = this.update_channels(
 814                    proto::UpdateChannels {
 815                        channels: vec![channel],
 816                        ..Default::default()
 817                    },
 818                    cx,
 819                );
 820                assert!(task.is_none());
 821
 822                // This event is emitted because the collab panel wants to clear the pending edit state
 823                // before this frame is rendered. But we can't guarantee that the collab panel's future
 824                // will resolve before this flush_effects finishes. Synchronously emitting this event
 825                // ensures that the collab panel will observe this creation before the frame complete
 826                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 827            })?;
 828            Ok(())
 829        })
 830    }
 831
 832    pub fn respond_to_channel_invite(
 833        &mut self,
 834        channel_id: ChannelId,
 835        accept: bool,
 836        cx: &mut Context<Self>,
 837    ) -> Task<Result<()>> {
 838        let client = self.client.clone();
 839        cx.background_spawn(async move {
 840            client
 841                .request(proto::RespondToChannelInvite {
 842                    channel_id: channel_id.0,
 843                    accept,
 844                })
 845                .await?;
 846            Ok(())
 847        })
 848    }
 849    pub fn fuzzy_search_members(
 850        &self,
 851        channel_id: ChannelId,
 852        query: String,
 853        limit: u16,
 854        cx: &mut Context<Self>,
 855    ) -> Task<Result<Vec<ChannelMembership>>> {
 856        let client = self.client.clone();
 857        let user_store = self.user_store.downgrade();
 858        cx.spawn(async move |_, cx| {
 859            let response = client
 860                .request(proto::GetChannelMembers {
 861                    channel_id: channel_id.0,
 862                    query,
 863                    limit: limit as u64,
 864                })
 865                .await?;
 866            user_store.update(cx, |user_store, _| {
 867                user_store.insert(response.users);
 868                response
 869                    .members
 870                    .into_iter()
 871                    .filter_map(|member| {
 872                        Some(ChannelMembership {
 873                            user: user_store.get_cached_user(member.user_id)?,
 874                            role: member.role(),
 875                            kind: member.kind(),
 876                        })
 877                    })
 878                    .collect()
 879            })
 880        })
 881    }
 882
 883    pub fn remove_channel(
 884        &self,
 885        channel_id: ChannelId,
 886    ) -> impl Future<Output = Result<()>> + use<> {
 887        let client = self.client.clone();
 888        async move {
 889            client
 890                .request(proto::DeleteChannel {
 891                    channel_id: channel_id.0,
 892                })
 893                .await?;
 894            Ok(())
 895        }
 896    }
 897
 898    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 899        false
 900    }
 901
 902    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 903        self.outgoing_invites.contains(&(channel_id, user_id))
 904    }
 905
 906    async fn handle_update_channels(
 907        this: Entity<Self>,
 908        message: TypedEnvelope<proto::UpdateChannels>,
 909        cx: AsyncApp,
 910    ) -> Result<()> {
 911        this.read_with(&cx, |this, _| {
 912            this.update_channels_tx
 913                .unbounded_send(message.payload)
 914                .unwrap();
 915        })?;
 916        Ok(())
 917    }
 918
 919    async fn handle_update_user_channels(
 920        this: Entity<Self>,
 921        message: TypedEnvelope<proto::UpdateUserChannels>,
 922        mut cx: AsyncApp,
 923    ) -> Result<()> {
 924        this.update(&mut cx, |this, cx| {
 925            for buffer_version in message.payload.observed_channel_buffer_version {
 926                let version = language::proto::deserialize_version(&buffer_version.version);
 927                this.acknowledge_notes_version(
 928                    ChannelId(buffer_version.channel_id),
 929                    buffer_version.epoch,
 930                    &version,
 931                    cx,
 932                );
 933            }
 934            for message_id in message.payload.observed_channel_message_id {
 935                this.acknowledge_message_id(
 936                    ChannelId(message_id.channel_id),
 937                    message_id.message_id,
 938                    cx,
 939                );
 940            }
 941            for membership in message.payload.channel_memberships {
 942                if let Some(role) = ChannelRole::from_i32(membership.role) {
 943                    this.channel_states
 944                        .entry(ChannelId(membership.channel_id))
 945                        .or_default()
 946                        .set_role(role)
 947                }
 948            }
 949        })
 950    }
 951
 952    fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 953        self.channel_index.clear();
 954        self.channel_invitations.clear();
 955        self.channel_participants.clear();
 956        self.channel_index.clear();
 957        self.outgoing_invites.clear();
 958        self.disconnect_channel_buffers_task.take();
 959
 960        for chat in self.opened_chats.values() {
 961            if let OpenEntityHandle::Open(chat) = chat
 962                && let Some(chat) = chat.upgrade()
 963            {
 964                chat.update(cx, |chat, cx| {
 965                    chat.rejoin(cx);
 966                });
 967            }
 968        }
 969
 970        let mut buffer_versions = Vec::new();
 971        for buffer in self.opened_buffers.values() {
 972            if let OpenEntityHandle::Open(buffer) = buffer
 973                && let Some(buffer) = buffer.upgrade()
 974            {
 975                let channel_buffer = buffer.read(cx);
 976                let buffer = channel_buffer.buffer().read(cx);
 977                buffer_versions.push(proto::ChannelBufferVersion {
 978                    channel_id: channel_buffer.channel_id.0,
 979                    epoch: channel_buffer.epoch(),
 980                    version: language::proto::serialize_version(&buffer.version()),
 981                });
 982            }
 983        }
 984
 985        if buffer_versions.is_empty() {
 986            return Task::ready(Ok(()));
 987        }
 988
 989        let response = self.client.request(proto::RejoinChannelBuffers {
 990            buffers: buffer_versions,
 991        });
 992
 993        cx.spawn(async move |this, cx| {
 994            let mut response = response.await?;
 995
 996            this.update(cx, |this, cx| {
 997                this.opened_buffers.retain(|_, buffer| match buffer {
 998                    OpenEntityHandle::Open(channel_buffer) => {
 999                        let Some(channel_buffer) = channel_buffer.upgrade() else {
1000                            return false;
1001                        };
1002
1003                        channel_buffer.update(cx, |channel_buffer, cx| {
1004                            let channel_id = channel_buffer.channel_id;
1005                            if let Some(remote_buffer) = response
1006                                .buffers
1007                                .iter_mut()
1008                                .find(|buffer| buffer.channel_id == channel_id.0)
1009                            {
1010                                let channel_id = channel_buffer.channel_id;
1011                                let remote_version =
1012                                    language::proto::deserialize_version(&remote_buffer.version);
1013
1014                                channel_buffer.replace_collaborators(
1015                                    mem::take(&mut remote_buffer.collaborators),
1016                                    cx,
1017                                );
1018
1019                                let operations = channel_buffer
1020                                    .buffer()
1021                                    .update(cx, |buffer, cx| {
1022                                        let outgoing_operations =
1023                                            buffer.serialize_ops(Some(remote_version), cx);
1024                                        let incoming_operations =
1025                                            mem::take(&mut remote_buffer.operations)
1026                                                .into_iter()
1027                                                .map(language::proto::deserialize_operation)
1028                                                .collect::<Result<Vec<_>>>()?;
1029                                        buffer.apply_ops(incoming_operations, cx);
1030                                        anyhow::Ok(outgoing_operations)
1031                                    })
1032                                    .log_err();
1033
1034                                if let Some(operations) = operations {
1035                                    channel_buffer.connected(cx);
1036                                    let client = this.client.clone();
1037                                    cx.background_spawn(async move {
1038                                        let operations = operations.await;
1039                                        for chunk in language::proto::split_operations(operations) {
1040                                            client
1041                                                .send(proto::UpdateChannelBuffer {
1042                                                    channel_id: channel_id.0,
1043                                                    operations: chunk,
1044                                                })
1045                                                .ok();
1046                                        }
1047                                    })
1048                                    .detach();
1049                                    return true;
1050                                }
1051                            }
1052
1053                            channel_buffer.disconnect(cx);
1054                            false
1055                        })
1056                    }
1057                    OpenEntityHandle::Loading(_) => true,
1058                });
1059            })
1060            .ok();
1061            anyhow::Ok(())
1062        })
1063    }
1064
1065    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1066        cx.notify();
1067        self.did_subscribe = false;
1068        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1069            cx.spawn(async move |this, cx| {
1070                if wait_for_reconnect {
1071                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1072                }
1073
1074                if let Some(this) = this.upgrade() {
1075                    this.update(cx, |this, cx| {
1076                        for buffer in this.opened_buffers.values() {
1077                            if let OpenEntityHandle::Open(buffer) = &buffer
1078                                && let Some(buffer) = buffer.upgrade()
1079                            {
1080                                buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1081                            }
1082                        }
1083                    })
1084                    .ok();
1085                }
1086            })
1087        });
1088    }
1089
1090    #[cfg(any(test, feature = "test-support"))]
1091    pub fn reset(&mut self) {
1092        self.channel_invitations.clear();
1093        self.channel_index.clear();
1094        self.channel_participants.clear();
1095        self.outgoing_invites.clear();
1096        self.opened_buffers.clear();
1097        self.opened_chats.clear();
1098        self.disconnect_channel_buffers_task = None;
1099        self.channel_states.clear();
1100    }
1101
1102    pub(crate) fn update_channels(
1103        &mut self,
1104        payload: proto::UpdateChannels,
1105        cx: &mut Context<ChannelStore>,
1106    ) -> Option<Task<Result<()>>> {
1107        if !payload.remove_channel_invitations.is_empty() {
1108            self.channel_invitations
1109                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1110        }
1111        for channel in payload.channel_invitations {
1112            match self
1113                .channel_invitations
1114                .binary_search_by_key(&channel.id, |c| c.id.0)
1115            {
1116                Ok(ix) => {
1117                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1118                }
1119                Err(ix) => self.channel_invitations.insert(
1120                    ix,
1121                    Arc::new(Channel {
1122                        id: ChannelId(channel.id),
1123                        visibility: channel.visibility(),
1124                        name: channel.name.into(),
1125                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1126                        channel_order: channel.channel_order,
1127                    }),
1128                ),
1129            }
1130        }
1131
1132        let channels_changed = !payload.channels.is_empty()
1133            || !payload.delete_channels.is_empty()
1134            || !payload.latest_channel_message_ids.is_empty()
1135            || !payload.latest_channel_buffer_versions.is_empty();
1136
1137        if channels_changed {
1138            if !payload.delete_channels.is_empty() {
1139                let delete_channels: Vec<ChannelId> =
1140                    payload.delete_channels.into_iter().map(ChannelId).collect();
1141                self.channel_index.delete_channels(&delete_channels);
1142                self.channel_participants
1143                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1144
1145                for channel_id in &delete_channels {
1146                    let channel_id = *channel_id;
1147                    if payload
1148                        .channels
1149                        .iter()
1150                        .any(|channel| channel.id == channel_id.0)
1151                    {
1152                        continue;
1153                    }
1154                    if let Some(OpenEntityHandle::Open(buffer)) =
1155                        self.opened_buffers.remove(&channel_id)
1156                        && let Some(buffer) = buffer.upgrade()
1157                    {
1158                        buffer.update(cx, ChannelBuffer::disconnect);
1159                    }
1160                }
1161            }
1162
1163            let mut index = self.channel_index.bulk_insert();
1164            for channel in payload.channels {
1165                let id = ChannelId(channel.id);
1166                let channel_changed = index.insert(channel);
1167
1168                if channel_changed
1169                    && let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id)
1170                    && let Some(buffer) = buffer.upgrade()
1171                {
1172                    buffer.update(cx, ChannelBuffer::channel_changed);
1173                }
1174            }
1175
1176            for latest_buffer_version in payload.latest_channel_buffer_versions {
1177                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1178                self.channel_states
1179                    .entry(ChannelId(latest_buffer_version.channel_id))
1180                    .or_default()
1181                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1182            }
1183
1184            for latest_channel_message in payload.latest_channel_message_ids {
1185                self.channel_states
1186                    .entry(ChannelId(latest_channel_message.channel_id))
1187                    .or_default()
1188                    .update_latest_message_id(latest_channel_message.message_id);
1189            }
1190
1191            self.channels_loaded.0.try_send(true).log_err();
1192        }
1193
1194        cx.notify();
1195        if payload.channel_participants.is_empty() {
1196            return None;
1197        }
1198
1199        let mut all_user_ids = Vec::new();
1200        let channel_participants = payload.channel_participants;
1201        for entry in &channel_participants {
1202            for user_id in entry.participant_user_ids.iter() {
1203                if let Err(ix) = all_user_ids.binary_search(user_id) {
1204                    all_user_ids.insert(ix, *user_id);
1205                }
1206            }
1207        }
1208
1209        let users = self
1210            .user_store
1211            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1212        Some(cx.spawn(async move |this, cx| {
1213            let users = users.await?;
1214
1215            this.update(cx, |this, cx| {
1216                for entry in &channel_participants {
1217                    let mut participants: Vec<_> = entry
1218                        .participant_user_ids
1219                        .iter()
1220                        .filter_map(|user_id| {
1221                            users
1222                                .binary_search_by_key(&user_id, |user| &user.id)
1223                                .ok()
1224                                .map(|ix| users[ix].clone())
1225                        })
1226                        .collect();
1227
1228                    participants.sort_by_key(|u| u.id);
1229
1230                    this.channel_participants
1231                        .insert(ChannelId(entry.channel_id), participants);
1232                }
1233
1234                cx.notify();
1235            })
1236        }))
1237    }
1238}
1239
1240impl ChannelState {
1241    fn set_role(&mut self, role: ChannelRole) {
1242        self.role = Some(role);
1243    }
1244
1245    fn has_channel_buffer_changed(&self) -> bool {
1246        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1247            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1248                && self
1249                    .latest_notes_version
1250                    .version
1251                    .changed_since(&self.observed_notes_version.version))
1252    }
1253
1254    fn has_new_messages(&self) -> bool {
1255        let latest_message_id = self.latest_chat_message;
1256        let observed_message_id = self.observed_chat_message;
1257
1258        latest_message_id.is_some_and(|latest_message_id| {
1259            latest_message_id > observed_message_id.unwrap_or_default()
1260        })
1261    }
1262
1263    fn last_acknowledged_message_id(&self) -> Option<u64> {
1264        self.observed_chat_message
1265    }
1266
1267    fn acknowledge_message_id(&mut self, message_id: u64) {
1268        let observed = self.observed_chat_message.get_or_insert(message_id);
1269        *observed = (*observed).max(message_id);
1270    }
1271
1272    fn update_latest_message_id(&mut self, message_id: u64) {
1273        self.latest_chat_message =
1274            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1275    }
1276
1277    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1278        if self.observed_notes_version.epoch == epoch {
1279            self.observed_notes_version.version.join(version);
1280        } else {
1281            self.observed_notes_version = NotesVersion {
1282                epoch,
1283                version: version.clone(),
1284            };
1285        }
1286    }
1287
1288    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1289        if self.latest_notes_version.epoch == epoch {
1290            self.latest_notes_version.version.join(version);
1291        } else {
1292            self.latest_notes_version = NotesVersion {
1293                epoch,
1294                version: version.clone(),
1295            };
1296        }
1297    }
1298}