channel_store.rs

   1mod channel_index;
   2
   3use crate::{ChannelMessage, channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
   4use anyhow::{Context as _, Result, anyhow};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
   7use collections::{HashMap, HashSet, hash_map};
   8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
   9use gpui::{
  10    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
  11    WeakEntity,
  12};
  13use language::Capability;
  14use rpc::{
  15    TypedEnvelope,
  16    proto::{self, ChannelRole, ChannelVisibility},
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{ResultExt, maybe};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
  25    let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  26    cx.set_global(GlobalChannelStore(channel_store));
  27}
  28
  29#[derive(Debug, Clone, Default, PartialEq)]
  30struct NotesVersion {
  31    epoch: u64,
  32    version: clock::Global,
  33}
  34
  35pub struct ChannelStore {
  36    pub channel_index: ChannelIndex,
  37    channel_invitations: Vec<Arc<Channel>>,
  38    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  39    channel_states: HashMap<ChannelId, ChannelState>,
  40    outgoing_invites: HashSet<(ChannelId, UserId)>,
  41    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  42    opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
  43    opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
  44    client: Arc<Client>,
  45    did_subscribe: bool,
  46    user_store: Entity<UserStore>,
  47    _rpc_subscriptions: [Subscription; 2],
  48    _watch_connection_status: Task<Option<()>>,
  49    disconnect_channel_buffers_task: Option<Task<()>>,
  50    _update_channels: Task<()>,
  51}
  52
  53#[derive(Clone, Debug)]
  54pub struct Channel {
  55    pub id: ChannelId,
  56    pub name: SharedString,
  57    pub visibility: proto::ChannelVisibility,
  58    pub parent_path: Vec<ChannelId>,
  59}
  60
  61#[derive(Default, Debug)]
  62pub struct ChannelState {
  63    latest_chat_message: Option<u64>,
  64    latest_notes_version: NotesVersion,
  65    observed_notes_version: NotesVersion,
  66    observed_chat_message: Option<u64>,
  67    role: Option<ChannelRole>,
  68}
  69
  70impl Channel {
  71    pub fn link(&self, cx: &App) -> String {
  72        format!(
  73            "{}/channel/{}-{}",
  74            ClientSettings::get_global(cx).server_url,
  75            Self::slug(&self.name),
  76            self.id
  77        )
  78    }
  79
  80    pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
  81        self.link(cx)
  82            + "/notes"
  83            + &heading
  84                .map(|h| format!("#{}", Self::slug(&h)))
  85                .unwrap_or_default()
  86    }
  87
  88    pub fn is_root_channel(&self) -> bool {
  89        self.parent_path.is_empty()
  90    }
  91
  92    pub fn root_id(&self) -> ChannelId {
  93        self.parent_path.first().copied().unwrap_or(self.id)
  94    }
  95
  96    pub fn slug(str: &str) -> String {
  97        let slug: String = str
  98            .chars()
  99            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 100            .collect();
 101
 102        slug.trim_matches(|c| c == '-').to_string()
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub struct ChannelMembership {
 108    pub user: Arc<User>,
 109    pub kind: proto::channel_member::Kind,
 110    pub role: proto::ChannelRole,
 111}
 112impl ChannelMembership {
 113    pub fn sort_key(&self) -> MembershipSortKey {
 114        MembershipSortKey {
 115            role_order: match self.role {
 116                proto::ChannelRole::Admin => 0,
 117                proto::ChannelRole::Member => 1,
 118                proto::ChannelRole::Banned => 2,
 119                proto::ChannelRole::Talker => 3,
 120                proto::ChannelRole::Guest => 4,
 121            },
 122            kind_order: match self.kind {
 123                proto::channel_member::Kind::Member => 0,
 124                proto::channel_member::Kind::Invitee => 1,
 125            },
 126            username_order: self.user.github_login.as_str(),
 127        }
 128    }
 129}
 130
 131#[derive(PartialOrd, Ord, PartialEq, Eq)]
 132pub struct MembershipSortKey<'a> {
 133    role_order: u8,
 134    kind_order: u8,
 135    username_order: &'a str,
 136}
 137
 138pub enum ChannelEvent {
 139    ChannelCreated(ChannelId),
 140    ChannelRenamed(ChannelId),
 141}
 142
 143impl EventEmitter<ChannelEvent> for ChannelStore {}
 144
 145enum OpenEntityHandle<E> {
 146    Open(WeakEntity<E>),
 147    Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
 148}
 149
 150struct GlobalChannelStore(Entity<ChannelStore>);
 151
 152impl Global for GlobalChannelStore {}
 153
 154impl ChannelStore {
 155    pub fn global(cx: &App) -> Entity<Self> {
 156        cx.global::<GlobalChannelStore>().0.clone()
 157    }
 158
 159    pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 160        let rpc_subscriptions = [
 161            client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
 162            client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
 163        ];
 164
 165        let mut connection_status = client.status();
 166        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 167        let watch_connection_status = cx.spawn(async move |this, cx| {
 168            while let Some(status) = connection_status.next().await {
 169                let this = this.upgrade()?;
 170                match status {
 171                    client::Status::Connected { .. } => {
 172                        this.update(cx, |this, cx| this.handle_connect(cx))
 173                            .ok()?
 174                            .await
 175                            .log_err()?;
 176                    }
 177                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 178                        this.update(cx, |this, cx| this.handle_disconnect(false, cx))
 179                            .ok();
 180                    }
 181                    _ => {
 182                        this.update(cx, |this, cx| this.handle_disconnect(true, cx))
 183                            .ok();
 184                    }
 185                }
 186            }
 187            Some(())
 188        });
 189
 190        Self {
 191            channel_invitations: Vec::default(),
 192            channel_index: ChannelIndex::default(),
 193            channel_participants: Default::default(),
 194            outgoing_invites: Default::default(),
 195            opened_buffers: Default::default(),
 196            opened_chats: Default::default(),
 197            update_channels_tx,
 198            client,
 199            user_store,
 200            _rpc_subscriptions: rpc_subscriptions,
 201            _watch_connection_status: watch_connection_status,
 202            disconnect_channel_buffers_task: None,
 203            _update_channels: cx.spawn(async move |this, cx| {
 204                maybe!(async move {
 205                    while let Some(update_channels) = update_channels_rx.next().await {
 206                        if let Some(this) = this.upgrade() {
 207                            let update_task = this
 208                                .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
 209                            if let Some(update_task) = update_task {
 210                                update_task.await.log_err();
 211                            }
 212                        }
 213                    }
 214                    anyhow::Ok(())
 215                })
 216                .await
 217                .log_err();
 218            }),
 219            channel_states: Default::default(),
 220            did_subscribe: false,
 221        }
 222    }
 223
 224    pub fn initialize(&mut self) {
 225        if !self.did_subscribe
 226            && self
 227                .client
 228                .send(proto::SubscribeToChannels {})
 229                .log_err()
 230                .is_some()
 231        {
 232            self.did_subscribe = true;
 233        }
 234    }
 235
 236    pub fn client(&self) -> Arc<Client> {
 237        self.client.clone()
 238    }
 239
 240    /// Returns the number of unique channels in the store
 241    pub fn channel_count(&self) -> usize {
 242        self.channel_index.by_id().len()
 243    }
 244
 245    /// Returns the index of a channel ID in the list of unique channels
 246    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 247        self.channel_index
 248            .by_id()
 249            .keys()
 250            .position(|id| *id == channel_id)
 251    }
 252
 253    /// Returns an iterator over all unique channels
 254    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 255        self.channel_index.by_id().values()
 256    }
 257
 258    /// Iterate over all entries in the channel DAG
 259    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 260        self.channel_index
 261            .ordered_channels()
 262            .iter()
 263            .filter_map(move |id| {
 264                let channel = self.channel_index.by_id().get(id)?;
 265                Some((channel.parent_path.len(), channel))
 266            })
 267    }
 268
 269    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 270        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 271        self.channel_index.by_id().get(channel_id)
 272    }
 273
 274    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 275        self.channel_index.by_id().values().nth(ix)
 276    }
 277
 278    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 279        self.channel_invitations
 280            .iter()
 281            .any(|channel| channel.id == channel_id)
 282    }
 283
 284    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 285        &self.channel_invitations
 286    }
 287
 288    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 289        self.channel_index.by_id().get(&channel_id)
 290    }
 291
 292    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
 293        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 294            if let OpenEntityHandle::Open(buffer) = buffer {
 295                return buffer.upgrade().is_some();
 296            }
 297        }
 298        false
 299    }
 300
 301    pub fn open_channel_buffer(
 302        &mut self,
 303        channel_id: ChannelId,
 304        cx: &mut Context<Self>,
 305    ) -> Task<Result<Entity<ChannelBuffer>>> {
 306        let client = self.client.clone();
 307        let user_store = self.user_store.clone();
 308        let channel_store = cx.entity();
 309        self.open_channel_resource(
 310            channel_id,
 311            |this| &mut this.opened_buffers,
 312            async move |channel, cx| {
 313                ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
 314            },
 315            cx,
 316        )
 317    }
 318
 319    pub fn fetch_channel_messages(
 320        &self,
 321        message_ids: Vec<u64>,
 322        cx: &mut Context<Self>,
 323    ) -> Task<Result<Vec<ChannelMessage>>> {
 324        let request = if message_ids.is_empty() {
 325            None
 326        } else {
 327            Some(
 328                self.client
 329                    .request(proto::GetChannelMessagesById { message_ids }),
 330            )
 331        };
 332        cx.spawn(async move |this, cx| {
 333            if let Some(request) = request {
 334                let response = request.await?;
 335                let this = this.upgrade().context("channel store dropped")?;
 336                let user_store = this.read_with(cx, |this, _| this.user_store.clone())?;
 337                ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
 338            } else {
 339                Ok(Vec::new())
 340            }
 341        })
 342    }
 343
 344    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 345        self.channel_states
 346            .get(&channel_id)
 347            .is_some_and(|state| state.has_channel_buffer_changed())
 348    }
 349
 350    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 351        self.channel_states
 352            .get(&channel_id)
 353            .is_some_and(|state| state.has_new_messages())
 354    }
 355
 356    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 357        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 358            state.latest_chat_message = message_id;
 359        }
 360    }
 361
 362    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 363        self.channel_states.get(&channel_id).and_then(|state| {
 364            if let Some(last_message_id) = state.latest_chat_message {
 365                if state
 366                    .last_acknowledged_message_id()
 367                    .is_some_and(|id| id < last_message_id)
 368                {
 369                    return state.last_acknowledged_message_id();
 370                }
 371            }
 372
 373            None
 374        })
 375    }
 376
 377    pub fn acknowledge_message_id(
 378        &mut self,
 379        channel_id: ChannelId,
 380        message_id: u64,
 381        cx: &mut Context<Self>,
 382    ) {
 383        self.channel_states
 384            .entry(channel_id)
 385            .or_default()
 386            .acknowledge_message_id(message_id);
 387        cx.notify();
 388    }
 389
 390    pub fn update_latest_message_id(
 391        &mut self,
 392        channel_id: ChannelId,
 393        message_id: u64,
 394        cx: &mut Context<Self>,
 395    ) {
 396        self.channel_states
 397            .entry(channel_id)
 398            .or_default()
 399            .update_latest_message_id(message_id);
 400        cx.notify();
 401    }
 402
 403    pub fn acknowledge_notes_version(
 404        &mut self,
 405        channel_id: ChannelId,
 406        epoch: u64,
 407        version: &clock::Global,
 408        cx: &mut Context<Self>,
 409    ) {
 410        self.channel_states
 411            .entry(channel_id)
 412            .or_default()
 413            .acknowledge_notes_version(epoch, version);
 414        cx.notify()
 415    }
 416
 417    pub fn update_latest_notes_version(
 418        &mut self,
 419        channel_id: ChannelId,
 420        epoch: u64,
 421        version: &clock::Global,
 422        cx: &mut Context<Self>,
 423    ) {
 424        self.channel_states
 425            .entry(channel_id)
 426            .or_default()
 427            .update_latest_notes_version(epoch, version);
 428        cx.notify()
 429    }
 430
 431    pub fn open_channel_chat(
 432        &mut self,
 433        channel_id: ChannelId,
 434        cx: &mut Context<Self>,
 435    ) -> Task<Result<Entity<ChannelChat>>> {
 436        let client = self.client.clone();
 437        let user_store = self.user_store.clone();
 438        let this = cx.entity();
 439        self.open_channel_resource(
 440            channel_id,
 441            |this| &mut this.opened_chats,
 442            async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
 443            cx,
 444        )
 445    }
 446
 447    /// Asynchronously open a given resource associated with a channel.
 448    ///
 449    /// Make sure that the resource is only opened once, even if this method
 450    /// is called multiple times with the same channel id while the first task
 451    /// is still running.
 452    fn open_channel_resource<T, F>(
 453        &mut self,
 454        channel_id: ChannelId,
 455        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
 456        load: F,
 457        cx: &mut Context<Self>,
 458    ) -> Task<Result<Entity<T>>>
 459    where
 460        F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
 461        T: 'static,
 462    {
 463        let task = loop {
 464            match get_map(self).entry(channel_id) {
 465                hash_map::Entry::Occupied(e) => match e.get() {
 466                    OpenEntityHandle::Open(entity) => {
 467                        if let Some(entity) = entity.upgrade() {
 468                            break Task::ready(Ok(entity)).shared();
 469                        } else {
 470                            get_map(self).remove(&channel_id);
 471                            continue;
 472                        }
 473                    }
 474                    OpenEntityHandle::Loading(task) => {
 475                        break task.clone();
 476                    }
 477                },
 478                hash_map::Entry::Vacant(e) => {
 479                    let task = cx
 480                        .spawn(async move |this, cx| {
 481                            let channel = this.read_with(cx, |this, _| {
 482                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 483                                    Arc::new(anyhow!("no channel for id: {channel_id}"))
 484                                })
 485                            })??;
 486
 487                            load(channel, cx).await.map_err(Arc::new)
 488                        })
 489                        .shared();
 490
 491                    e.insert(OpenEntityHandle::Loading(task.clone()));
 492                    cx.spawn({
 493                        let task = task.clone();
 494                        async move |this, cx| {
 495                            let result = task.await;
 496                            this.update(cx, |this, _| match result {
 497                                Ok(model) => {
 498                                    get_map(this).insert(
 499                                        channel_id,
 500                                        OpenEntityHandle::Open(model.downgrade()),
 501                                    );
 502                                }
 503                                Err(_) => {
 504                                    get_map(this).remove(&channel_id);
 505                                }
 506                            })
 507                            .ok();
 508                        }
 509                    })
 510                    .detach();
 511                    break task;
 512                }
 513            }
 514        };
 515        cx.background_spawn(async move { task.await.map_err(|error| anyhow!("{error}")) })
 516    }
 517
 518    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 519        self.channel_role(channel_id) == proto::ChannelRole::Admin
 520    }
 521
 522    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 523        self.channel_index
 524            .by_id()
 525            .get(&channel_id)
 526            .map_or(false, |channel| channel.is_root_channel())
 527    }
 528
 529    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 530        self.channel_index
 531            .by_id()
 532            .get(&channel_id)
 533            .map_or(false, |channel| {
 534                channel.visibility == ChannelVisibility::Public
 535            })
 536    }
 537
 538    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 539        match self.channel_role(channel_id) {
 540            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 541            _ => Capability::ReadOnly,
 542        }
 543    }
 544
 545    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 546        maybe!({
 547            let mut channel = self.channel_for_id(channel_id)?;
 548            if !channel.is_root_channel() {
 549                channel = self.channel_for_id(channel.root_id())?;
 550            }
 551            let root_channel_state = self.channel_states.get(&channel.id);
 552            root_channel_state?.role
 553        })
 554        .unwrap_or(proto::ChannelRole::Guest)
 555    }
 556
 557    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 558        self.channel_participants
 559            .get(&channel_id)
 560            .map_or(&[], |v| v.as_slice())
 561    }
 562
 563    pub fn create_channel(
 564        &self,
 565        name: &str,
 566        parent_id: Option<ChannelId>,
 567        cx: &mut Context<Self>,
 568    ) -> Task<Result<ChannelId>> {
 569        let client = self.client.clone();
 570        let name = name.trim_start_matches('#').to_owned();
 571        cx.spawn(async move |this, cx| {
 572            let response = client
 573                .request(proto::CreateChannel {
 574                    name,
 575                    parent_id: parent_id.map(|cid| cid.0),
 576                })
 577                .await?;
 578
 579            let channel = response.channel.context("missing channel in response")?;
 580            let channel_id = ChannelId(channel.id);
 581
 582            this.update(cx, |this, cx| {
 583                let task = this.update_channels(
 584                    proto::UpdateChannels {
 585                        channels: vec![channel],
 586                        ..Default::default()
 587                    },
 588                    cx,
 589                );
 590                assert!(task.is_none());
 591
 592                // This event is emitted because the collab panel wants to clear the pending edit state
 593                // before this frame is rendered. But we can't guarantee that the collab panel's future
 594                // will resolve before this flush_effects finishes. Synchronously emitting this event
 595                // ensures that the collab panel will observe this creation before the frame completes
 596                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 597            })?;
 598
 599            Ok(channel_id)
 600        })
 601    }
 602
 603    pub fn move_channel(
 604        &mut self,
 605        channel_id: ChannelId,
 606        to: ChannelId,
 607        cx: &mut Context<Self>,
 608    ) -> Task<Result<()>> {
 609        let client = self.client.clone();
 610        cx.spawn(async move |_, _| {
 611            let _ = client
 612                .request(proto::MoveChannel {
 613                    channel_id: channel_id.0,
 614                    to: to.0,
 615                })
 616                .await?;
 617
 618            Ok(())
 619        })
 620    }
 621
 622    pub fn set_channel_visibility(
 623        &mut self,
 624        channel_id: ChannelId,
 625        visibility: ChannelVisibility,
 626        cx: &mut Context<Self>,
 627    ) -> Task<Result<()>> {
 628        let client = self.client.clone();
 629        cx.spawn(async move |_, _| {
 630            let _ = client
 631                .request(proto::SetChannelVisibility {
 632                    channel_id: channel_id.0,
 633                    visibility: visibility.into(),
 634                })
 635                .await?;
 636
 637            Ok(())
 638        })
 639    }
 640
 641    pub fn invite_member(
 642        &mut self,
 643        channel_id: ChannelId,
 644        user_id: UserId,
 645        role: proto::ChannelRole,
 646        cx: &mut Context<Self>,
 647    ) -> Task<Result<()>> {
 648        if !self.outgoing_invites.insert((channel_id, user_id)) {
 649            return Task::ready(Err(anyhow!("invite request already in progress")));
 650        }
 651
 652        cx.notify();
 653        let client = self.client.clone();
 654        cx.spawn(async move |this, cx| {
 655            let result = client
 656                .request(proto::InviteChannelMember {
 657                    channel_id: channel_id.0,
 658                    user_id,
 659                    role: role.into(),
 660                })
 661                .await;
 662
 663            this.update(cx, |this, cx| {
 664                this.outgoing_invites.remove(&(channel_id, user_id));
 665                cx.notify();
 666            })?;
 667
 668            result?;
 669
 670            Ok(())
 671        })
 672    }
 673
 674    pub fn remove_member(
 675        &mut self,
 676        channel_id: ChannelId,
 677        user_id: u64,
 678        cx: &mut Context<Self>,
 679    ) -> Task<Result<()>> {
 680        if !self.outgoing_invites.insert((channel_id, user_id)) {
 681            return Task::ready(Err(anyhow!("invite request already in progress")));
 682        }
 683
 684        cx.notify();
 685        let client = self.client.clone();
 686        cx.spawn(async move |this, cx| {
 687            let result = client
 688                .request(proto::RemoveChannelMember {
 689                    channel_id: channel_id.0,
 690                    user_id,
 691                })
 692                .await;
 693
 694            this.update(cx, |this, cx| {
 695                this.outgoing_invites.remove(&(channel_id, user_id));
 696                cx.notify();
 697            })?;
 698            result?;
 699            Ok(())
 700        })
 701    }
 702
 703    pub fn set_member_role(
 704        &mut self,
 705        channel_id: ChannelId,
 706        user_id: UserId,
 707        role: proto::ChannelRole,
 708        cx: &mut Context<Self>,
 709    ) -> Task<Result<()>> {
 710        if !self.outgoing_invites.insert((channel_id, user_id)) {
 711            return Task::ready(Err(anyhow!("member request already in progress")));
 712        }
 713
 714        cx.notify();
 715        let client = self.client.clone();
 716        cx.spawn(async move |this, cx| {
 717            let result = client
 718                .request(proto::SetChannelMemberRole {
 719                    channel_id: channel_id.0,
 720                    user_id,
 721                    role: role.into(),
 722                })
 723                .await;
 724
 725            this.update(cx, |this, cx| {
 726                this.outgoing_invites.remove(&(channel_id, user_id));
 727                cx.notify();
 728            })?;
 729
 730            result?;
 731            Ok(())
 732        })
 733    }
 734
 735    pub fn rename(
 736        &mut self,
 737        channel_id: ChannelId,
 738        new_name: &str,
 739        cx: &mut Context<Self>,
 740    ) -> Task<Result<()>> {
 741        let client = self.client.clone();
 742        let name = new_name.to_string();
 743        cx.spawn(async move |this, cx| {
 744            let channel = client
 745                .request(proto::RenameChannel {
 746                    channel_id: channel_id.0,
 747                    name,
 748                })
 749                .await?
 750                .channel
 751                .context("missing channel in response")?;
 752            this.update(cx, |this, cx| {
 753                let task = this.update_channels(
 754                    proto::UpdateChannels {
 755                        channels: vec![channel],
 756                        ..Default::default()
 757                    },
 758                    cx,
 759                );
 760                assert!(task.is_none());
 761
 762                // This event is emitted because the collab panel wants to clear the pending edit state
 763                // before this frame is rendered. But we can't guarantee that the collab panel's future
 764                // will resolve before this flush_effects finishes. Synchronously emitting this event
 765                // ensures that the collab panel will observe this creation before the frame complete
 766                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 767            })?;
 768            Ok(())
 769        })
 770    }
 771
 772    pub fn respond_to_channel_invite(
 773        &mut self,
 774        channel_id: ChannelId,
 775        accept: bool,
 776        cx: &mut Context<Self>,
 777    ) -> Task<Result<()>> {
 778        let client = self.client.clone();
 779        cx.background_spawn(async move {
 780            client
 781                .request(proto::RespondToChannelInvite {
 782                    channel_id: channel_id.0,
 783                    accept,
 784                })
 785                .await?;
 786            Ok(())
 787        })
 788    }
 789    pub fn fuzzy_search_members(
 790        &self,
 791        channel_id: ChannelId,
 792        query: String,
 793        limit: u16,
 794        cx: &mut Context<Self>,
 795    ) -> Task<Result<Vec<ChannelMembership>>> {
 796        let client = self.client.clone();
 797        let user_store = self.user_store.downgrade();
 798        cx.spawn(async move |_, cx| {
 799            let response = client
 800                .request(proto::GetChannelMembers {
 801                    channel_id: channel_id.0,
 802                    query,
 803                    limit: limit as u64,
 804                })
 805                .await?;
 806            user_store.update(cx, |user_store, _| {
 807                user_store.insert(response.users);
 808                response
 809                    .members
 810                    .into_iter()
 811                    .filter_map(|member| {
 812                        Some(ChannelMembership {
 813                            user: user_store.get_cached_user(member.user_id)?,
 814                            role: member.role(),
 815                            kind: member.kind(),
 816                        })
 817                    })
 818                    .collect()
 819            })
 820        })
 821    }
 822
 823    pub fn remove_channel(
 824        &self,
 825        channel_id: ChannelId,
 826    ) -> impl Future<Output = Result<()>> + use<> {
 827        let client = self.client.clone();
 828        async move {
 829            client
 830                .request(proto::DeleteChannel {
 831                    channel_id: channel_id.0,
 832                })
 833                .await?;
 834            Ok(())
 835        }
 836    }
 837
 838    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 839        false
 840    }
 841
 842    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 843        self.outgoing_invites.contains(&(channel_id, user_id))
 844    }
 845
 846    async fn handle_update_channels(
 847        this: Entity<Self>,
 848        message: TypedEnvelope<proto::UpdateChannels>,
 849        mut cx: AsyncApp,
 850    ) -> Result<()> {
 851        this.read_with(&mut cx, |this, _| {
 852            this.update_channels_tx
 853                .unbounded_send(message.payload)
 854                .unwrap();
 855        })?;
 856        Ok(())
 857    }
 858
 859    async fn handle_update_user_channels(
 860        this: Entity<Self>,
 861        message: TypedEnvelope<proto::UpdateUserChannels>,
 862        mut cx: AsyncApp,
 863    ) -> Result<()> {
 864        this.update(&mut cx, |this, cx| {
 865            for buffer_version in message.payload.observed_channel_buffer_version {
 866                let version = language::proto::deserialize_version(&buffer_version.version);
 867                this.acknowledge_notes_version(
 868                    ChannelId(buffer_version.channel_id),
 869                    buffer_version.epoch,
 870                    &version,
 871                    cx,
 872                );
 873            }
 874            for message_id in message.payload.observed_channel_message_id {
 875                this.acknowledge_message_id(
 876                    ChannelId(message_id.channel_id),
 877                    message_id.message_id,
 878                    cx,
 879                );
 880            }
 881            for membership in message.payload.channel_memberships {
 882                if let Some(role) = ChannelRole::from_i32(membership.role) {
 883                    this.channel_states
 884                        .entry(ChannelId(membership.channel_id))
 885                        .or_default()
 886                        .set_role(role)
 887                }
 888            }
 889        })
 890    }
 891
 892    fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 893        self.channel_index.clear();
 894        self.channel_invitations.clear();
 895        self.channel_participants.clear();
 896        self.channel_index.clear();
 897        self.outgoing_invites.clear();
 898        self.disconnect_channel_buffers_task.take();
 899
 900        for chat in self.opened_chats.values() {
 901            if let OpenEntityHandle::Open(chat) = chat {
 902                if let Some(chat) = chat.upgrade() {
 903                    chat.update(cx, |chat, cx| {
 904                        chat.rejoin(cx);
 905                    });
 906                }
 907            }
 908        }
 909
 910        let mut buffer_versions = Vec::new();
 911        for buffer in self.opened_buffers.values() {
 912            if let OpenEntityHandle::Open(buffer) = buffer {
 913                if let Some(buffer) = buffer.upgrade() {
 914                    let channel_buffer = buffer.read(cx);
 915                    let buffer = channel_buffer.buffer().read(cx);
 916                    buffer_versions.push(proto::ChannelBufferVersion {
 917                        channel_id: channel_buffer.channel_id.0,
 918                        epoch: channel_buffer.epoch(),
 919                        version: language::proto::serialize_version(&buffer.version()),
 920                    });
 921                }
 922            }
 923        }
 924
 925        if buffer_versions.is_empty() {
 926            return Task::ready(Ok(()));
 927        }
 928
 929        let response = self.client.request(proto::RejoinChannelBuffers {
 930            buffers: buffer_versions,
 931        });
 932
 933        cx.spawn(async move |this, cx| {
 934            let mut response = response.await?;
 935
 936            this.update(cx, |this, cx| {
 937                this.opened_buffers.retain(|_, buffer| match buffer {
 938                    OpenEntityHandle::Open(channel_buffer) => {
 939                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 940                            return false;
 941                        };
 942
 943                        channel_buffer.update(cx, |channel_buffer, cx| {
 944                            let channel_id = channel_buffer.channel_id;
 945                            if let Some(remote_buffer) = response
 946                                .buffers
 947                                .iter_mut()
 948                                .find(|buffer| buffer.channel_id == channel_id.0)
 949                            {
 950                                let channel_id = channel_buffer.channel_id;
 951                                let remote_version =
 952                                    language::proto::deserialize_version(&remote_buffer.version);
 953
 954                                channel_buffer.replace_collaborators(
 955                                    mem::take(&mut remote_buffer.collaborators),
 956                                    cx,
 957                                );
 958
 959                                let operations = channel_buffer
 960                                    .buffer()
 961                                    .update(cx, |buffer, cx| {
 962                                        let outgoing_operations =
 963                                            buffer.serialize_ops(Some(remote_version), cx);
 964                                        let incoming_operations =
 965                                            mem::take(&mut remote_buffer.operations)
 966                                                .into_iter()
 967                                                .map(language::proto::deserialize_operation)
 968                                                .collect::<Result<Vec<_>>>()?;
 969                                        buffer.apply_ops(incoming_operations, cx);
 970                                        anyhow::Ok(outgoing_operations)
 971                                    })
 972                                    .log_err();
 973
 974                                if let Some(operations) = operations {
 975                                    let client = this.client.clone();
 976                                    cx.background_spawn(async move {
 977                                        let operations = operations.await;
 978                                        for chunk in language::proto::split_operations(operations) {
 979                                            client
 980                                                .send(proto::UpdateChannelBuffer {
 981                                                    channel_id: channel_id.0,
 982                                                    operations: chunk,
 983                                                })
 984                                                .ok();
 985                                        }
 986                                    })
 987                                    .detach();
 988                                    return true;
 989                                }
 990                            }
 991
 992                            channel_buffer.disconnect(cx);
 993                            false
 994                        })
 995                    }
 996                    OpenEntityHandle::Loading(_) => true,
 997                });
 998            })
 999            .ok();
1000            anyhow::Ok(())
1001        })
1002    }
1003
1004    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1005        cx.notify();
1006        self.did_subscribe = false;
1007        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1008            cx.spawn(async move |this, cx| {
1009                if wait_for_reconnect {
1010                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1011                }
1012
1013                if let Some(this) = this.upgrade() {
1014                    this.update(cx, |this, cx| {
1015                        for (_, buffer) in this.opened_buffers.drain() {
1016                            if let OpenEntityHandle::Open(buffer) = buffer {
1017                                if let Some(buffer) = buffer.upgrade() {
1018                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1019                                }
1020                            }
1021                        }
1022                    })
1023                    .ok();
1024                }
1025            })
1026        });
1027    }
1028
1029    pub(crate) fn update_channels(
1030        &mut self,
1031        payload: proto::UpdateChannels,
1032        cx: &mut Context<ChannelStore>,
1033    ) -> Option<Task<Result<()>>> {
1034        if !payload.remove_channel_invitations.is_empty() {
1035            self.channel_invitations
1036                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1037        }
1038        for channel in payload.channel_invitations {
1039            match self
1040                .channel_invitations
1041                .binary_search_by_key(&channel.id, |c| c.id.0)
1042            {
1043                Ok(ix) => {
1044                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1045                }
1046                Err(ix) => self.channel_invitations.insert(
1047                    ix,
1048                    Arc::new(Channel {
1049                        id: ChannelId(channel.id),
1050                        visibility: channel.visibility(),
1051                        name: channel.name.into(),
1052                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1053                    }),
1054                ),
1055            }
1056        }
1057
1058        let channels_changed = !payload.channels.is_empty()
1059            || !payload.delete_channels.is_empty()
1060            || !payload.latest_channel_message_ids.is_empty()
1061            || !payload.latest_channel_buffer_versions.is_empty();
1062
1063        if channels_changed {
1064            if !payload.delete_channels.is_empty() {
1065                let delete_channels: Vec<ChannelId> =
1066                    payload.delete_channels.into_iter().map(ChannelId).collect();
1067                self.channel_index.delete_channels(&delete_channels);
1068                self.channel_participants
1069                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1070
1071                for channel_id in &delete_channels {
1072                    let channel_id = *channel_id;
1073                    if payload
1074                        .channels
1075                        .iter()
1076                        .any(|channel| channel.id == channel_id.0)
1077                    {
1078                        continue;
1079                    }
1080                    if let Some(OpenEntityHandle::Open(buffer)) =
1081                        self.opened_buffers.remove(&channel_id)
1082                    {
1083                        if let Some(buffer) = buffer.upgrade() {
1084                            buffer.update(cx, ChannelBuffer::disconnect);
1085                        }
1086                    }
1087                }
1088            }
1089
1090            let mut index = self.channel_index.bulk_insert();
1091            for channel in payload.channels {
1092                let id = ChannelId(channel.id);
1093                let channel_changed = index.insert(channel);
1094
1095                if channel_changed {
1096                    if let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1097                        if let Some(buffer) = buffer.upgrade() {
1098                            buffer.update(cx, ChannelBuffer::channel_changed);
1099                        }
1100                    }
1101                }
1102            }
1103
1104            for latest_buffer_version in payload.latest_channel_buffer_versions {
1105                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1106                self.channel_states
1107                    .entry(ChannelId(latest_buffer_version.channel_id))
1108                    .or_default()
1109                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1110            }
1111
1112            for latest_channel_message in payload.latest_channel_message_ids {
1113                self.channel_states
1114                    .entry(ChannelId(latest_channel_message.channel_id))
1115                    .or_default()
1116                    .update_latest_message_id(latest_channel_message.message_id);
1117            }
1118        }
1119
1120        cx.notify();
1121        if payload.channel_participants.is_empty() {
1122            return None;
1123        }
1124
1125        let mut all_user_ids = Vec::new();
1126        let channel_participants = payload.channel_participants;
1127        for entry in &channel_participants {
1128            for user_id in entry.participant_user_ids.iter() {
1129                if let Err(ix) = all_user_ids.binary_search(user_id) {
1130                    all_user_ids.insert(ix, *user_id);
1131                }
1132            }
1133        }
1134
1135        let users = self
1136            .user_store
1137            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1138        Some(cx.spawn(async move |this, cx| {
1139            let users = users.await?;
1140
1141            this.update(cx, |this, cx| {
1142                for entry in &channel_participants {
1143                    let mut participants: Vec<_> = entry
1144                        .participant_user_ids
1145                        .iter()
1146                        .filter_map(|user_id| {
1147                            users
1148                                .binary_search_by_key(&user_id, |user| &user.id)
1149                                .ok()
1150                                .map(|ix| users[ix].clone())
1151                        })
1152                        .collect();
1153
1154                    participants.sort_by_key(|u| u.id);
1155
1156                    this.channel_participants
1157                        .insert(ChannelId(entry.channel_id), participants);
1158                }
1159
1160                cx.notify();
1161            })
1162        }))
1163    }
1164}
1165
1166impl ChannelState {
1167    fn set_role(&mut self, role: ChannelRole) {
1168        self.role = Some(role);
1169    }
1170
1171    fn has_channel_buffer_changed(&self) -> bool {
1172        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1173            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1174                && self
1175                    .latest_notes_version
1176                    .version
1177                    .changed_since(&self.observed_notes_version.version))
1178    }
1179
1180    fn has_new_messages(&self) -> bool {
1181        let latest_message_id = self.latest_chat_message;
1182        let observed_message_id = self.observed_chat_message;
1183
1184        latest_message_id.is_some_and(|latest_message_id| {
1185            latest_message_id > observed_message_id.unwrap_or_default()
1186        })
1187    }
1188
1189    fn last_acknowledged_message_id(&self) -> Option<u64> {
1190        self.observed_chat_message
1191    }
1192
1193    fn acknowledge_message_id(&mut self, message_id: u64) {
1194        let observed = self.observed_chat_message.get_or_insert(message_id);
1195        *observed = (*observed).max(message_id);
1196    }
1197
1198    fn update_latest_message_id(&mut self, message_id: u64) {
1199        self.latest_chat_message =
1200            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1201    }
1202
1203    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1204        if self.observed_notes_version.epoch == epoch {
1205            self.observed_notes_version.version.join(version);
1206        } else {
1207            self.observed_notes_version = NotesVersion {
1208                epoch,
1209                version: version.clone(),
1210            };
1211        }
1212    }
1213
1214    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1215        if self.latest_notes_version.epoch == epoch {
1216            self.latest_notes_version.version.join(version);
1217        } else {
1218            self.latest_notes_version = NotesVersion {
1219                epoch,
1220                version: version.clone(),
1221            };
1222        }
1223    }
1224}