channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use db::RELEASE_CHANNEL;
   9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  10use gpui::{
  11    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SharedString, Task,
  12    WeakModel,
  13};
  14use language::Capability;
  15use rpc::{
  16    proto::{self, ChannelRole, ChannelVisibility},
  17    TypedEnvelope,
  18};
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{async_maybe, maybe, ResultExt};
  21
  22pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  23    let channel_store =
  24        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  25    cx.set_global(channel_store);
  26}
  27
  28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  29
  30pub type ChannelId = u64;
  31
  32#[derive(Debug, Clone, Default)]
  33struct NotesVersion {
  34    epoch: u64,
  35    version: clock::Global,
  36}
  37
  38pub struct ChannelStore {
  39    pub channel_index: ChannelIndex,
  40    channel_invitations: Vec<Arc<Channel>>,
  41    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  42    channel_states: HashMap<ChannelId, ChannelState>,
  43
  44    outgoing_invites: HashSet<(ChannelId, UserId)>,
  45    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  46    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  47    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  48    client: Arc<Client>,
  49    user_store: Model<UserStore>,
  50    _rpc_subscriptions: [Subscription; 2],
  51    _watch_connection_status: Task<Option<()>>,
  52    disconnect_channel_buffers_task: Option<Task<()>>,
  53    _update_channels: Task<()>,
  54}
  55
  56#[derive(Clone, Debug)]
  57pub struct Channel {
  58    pub id: ChannelId,
  59    pub name: SharedString,
  60    pub visibility: proto::ChannelVisibility,
  61    pub parent_path: Vec<u64>,
  62}
  63
  64#[derive(Default)]
  65pub struct ChannelState {
  66    latest_chat_message: Option<u64>,
  67    latest_notes_versions: Option<NotesVersion>,
  68    observed_chat_message: Option<u64>,
  69    observed_notes_versions: Option<NotesVersion>,
  70    role: Option<ChannelRole>,
  71}
  72
  73impl Channel {
  74    pub fn link(&self) -> String {
  75        RELEASE_CHANNEL.link_prefix().to_owned()
  76            + "channel/"
  77            + &self.slug()
  78            + "-"
  79            + &self.id.to_string()
  80    }
  81
  82    pub fn is_root_channel(&self) -> bool {
  83        self.parent_path.is_empty()
  84    }
  85
  86    pub fn root_id(&self) -> ChannelId {
  87        self.parent_path
  88            .first()
  89            .map(|id| *id as ChannelId)
  90            .unwrap_or(self.id)
  91    }
  92
  93    pub fn slug(&self) -> String {
  94        let slug: String = self
  95            .name
  96            .chars()
  97            .map(|c| if c.is_alphanumeric() { c } else { '-' })
  98            .collect();
  99
 100        slug.trim_matches(|c| c == '-').to_string()
 101    }
 102}
 103
 104pub struct ChannelMembership {
 105    pub user: Arc<User>,
 106    pub kind: proto::channel_member::Kind,
 107    pub role: proto::ChannelRole,
 108}
 109impl ChannelMembership {
 110    pub fn sort_key(&self) -> MembershipSortKey {
 111        MembershipSortKey {
 112            role_order: match self.role {
 113                proto::ChannelRole::Admin => 0,
 114                proto::ChannelRole::Member => 1,
 115                proto::ChannelRole::Banned => 2,
 116                proto::ChannelRole::Guest => 3,
 117            },
 118            kind_order: match self.kind {
 119                proto::channel_member::Kind::Member => 0,
 120                proto::channel_member::Kind::Invitee => 1,
 121            },
 122            username_order: self.user.github_login.as_str(),
 123        }
 124    }
 125}
 126
 127#[derive(PartialOrd, Ord, PartialEq, Eq)]
 128pub struct MembershipSortKey<'a> {
 129    role_order: u8,
 130    kind_order: u8,
 131    username_order: &'a str,
 132}
 133
 134pub enum ChannelEvent {
 135    ChannelCreated(ChannelId),
 136    ChannelRenamed(ChannelId),
 137}
 138
 139impl EventEmitter<ChannelEvent> for ChannelStore {}
 140
 141enum OpenedModelHandle<E> {
 142    Open(WeakModel<E>),
 143    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 144}
 145
 146impl ChannelStore {
 147    pub fn global(cx: &AppContext) -> Model<Self> {
 148        cx.global::<Model<Self>>().clone()
 149    }
 150
 151    pub fn new(
 152        client: Arc<Client>,
 153        user_store: Model<UserStore>,
 154        cx: &mut ModelContext<Self>,
 155    ) -> Self {
 156        let rpc_subscriptions = [
 157            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 158            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 159        ];
 160
 161        let mut connection_status = client.status();
 162        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 163        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 164            while let Some(status) = connection_status.next().await {
 165                let this = this.upgrade()?;
 166                match status {
 167                    client::Status::Connected { .. } => {
 168                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 169                            .ok()?
 170                            .await
 171                            .log_err()?;
 172                    }
 173                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 174                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 175                            .ok();
 176                    }
 177                    _ => {
 178                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 179                            .ok();
 180                    }
 181                }
 182            }
 183            Some(())
 184        });
 185
 186        Self {
 187            channel_invitations: Vec::default(),
 188            channel_index: ChannelIndex::default(),
 189            channel_participants: Default::default(),
 190            outgoing_invites: Default::default(),
 191            opened_buffers: Default::default(),
 192            opened_chats: Default::default(),
 193            update_channels_tx,
 194            client,
 195            user_store,
 196            _rpc_subscriptions: rpc_subscriptions,
 197            _watch_connection_status: watch_connection_status,
 198            disconnect_channel_buffers_task: None,
 199            _update_channels: cx.spawn(|this, mut cx| async move {
 200                async_maybe!({
 201                    while let Some(update_channels) = update_channels_rx.next().await {
 202                        if let Some(this) = this.upgrade() {
 203                            let update_task = this.update(&mut cx, |this, cx| {
 204                                this.update_channels(update_channels, cx)
 205                            })?;
 206                            if let Some(update_task) = update_task {
 207                                update_task.await.log_err();
 208                            }
 209                        }
 210                    }
 211                    anyhow::Ok(())
 212                })
 213                .await
 214                .log_err();
 215            }),
 216            channel_states: Default::default(),
 217        }
 218    }
 219
 220    pub fn client(&self) -> Arc<Client> {
 221        self.client.clone()
 222    }
 223
 224    /// Returns the number of unique channels in the store
 225    pub fn channel_count(&self) -> usize {
 226        self.channel_index.by_id().len()
 227    }
 228
 229    /// Returns the index of a channel ID in the list of unique channels
 230    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 231        self.channel_index
 232            .by_id()
 233            .keys()
 234            .position(|id| *id == channel_id)
 235    }
 236
 237    /// Returns an iterator over all unique channels
 238    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 239        self.channel_index.by_id().values()
 240    }
 241
 242    /// Iterate over all entries in the channel DAG
 243    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 244        self.channel_index
 245            .ordered_channels()
 246            .iter()
 247            .filter_map(move |id| {
 248                let channel = self.channel_index.by_id().get(id)?;
 249                Some((channel.parent_path.len(), channel))
 250            })
 251    }
 252
 253    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 254        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 255        self.channel_index.by_id().get(channel_id)
 256    }
 257
 258    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 259        self.channel_index.by_id().values().nth(ix)
 260    }
 261
 262    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 263        self.channel_invitations
 264            .iter()
 265            .any(|channel| channel.id == channel_id)
 266    }
 267
 268    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 269        &self.channel_invitations
 270    }
 271
 272    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 273        self.channel_index.by_id().get(&channel_id)
 274    }
 275
 276    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 277        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 278            if let OpenedModelHandle::Open(buffer) = buffer {
 279                return buffer.upgrade().is_some();
 280            }
 281        }
 282        false
 283    }
 284
 285    pub fn open_channel_buffer(
 286        &mut self,
 287        channel_id: ChannelId,
 288        cx: &mut ModelContext<Self>,
 289    ) -> Task<Result<Model<ChannelBuffer>>> {
 290        let client = self.client.clone();
 291        let user_store = self.user_store.clone();
 292        let channel_store = cx.handle();
 293        self.open_channel_resource(
 294            channel_id,
 295            |this| &mut this.opened_buffers,
 296            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 297            cx,
 298        )
 299    }
 300
 301    pub fn fetch_channel_messages(
 302        &self,
 303        message_ids: Vec<u64>,
 304        cx: &mut ModelContext<Self>,
 305    ) -> Task<Result<Vec<ChannelMessage>>> {
 306        let request = if message_ids.is_empty() {
 307            None
 308        } else {
 309            Some(
 310                self.client
 311                    .request(proto::GetChannelMessagesById { message_ids }),
 312            )
 313        };
 314        cx.spawn(|this, mut cx| async move {
 315            if let Some(request) = request {
 316                let response = request.await?;
 317                let this = this
 318                    .upgrade()
 319                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 320                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 321                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 322            } else {
 323                Ok(Vec::new())
 324            }
 325        })
 326    }
 327
 328    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 329        self.channel_states
 330            .get(&channel_id)
 331            .is_some_and(|state| state.has_channel_buffer_changed())
 332    }
 333
 334    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 335        self.channel_states
 336            .get(&channel_id)
 337            .is_some_and(|state| state.has_new_messages())
 338    }
 339
 340    pub fn acknowledge_message_id(
 341        &mut self,
 342        channel_id: ChannelId,
 343        message_id: u64,
 344        cx: &mut ModelContext<Self>,
 345    ) {
 346        self.channel_states
 347            .entry(channel_id)
 348            .or_insert_with(|| Default::default())
 349            .acknowledge_message_id(message_id);
 350        cx.notify();
 351    }
 352
 353    pub fn update_latest_message_id(
 354        &mut self,
 355        channel_id: ChannelId,
 356        message_id: u64,
 357        cx: &mut ModelContext<Self>,
 358    ) {
 359        self.channel_states
 360            .entry(channel_id)
 361            .or_insert_with(|| Default::default())
 362            .update_latest_message_id(message_id);
 363        cx.notify();
 364    }
 365
 366    pub fn acknowledge_notes_version(
 367        &mut self,
 368        channel_id: ChannelId,
 369        epoch: u64,
 370        version: &clock::Global,
 371        cx: &mut ModelContext<Self>,
 372    ) {
 373        self.channel_states
 374            .entry(channel_id)
 375            .or_insert_with(|| Default::default())
 376            .acknowledge_notes_version(epoch, version);
 377        cx.notify()
 378    }
 379
 380    pub fn update_latest_notes_version(
 381        &mut self,
 382        channel_id: ChannelId,
 383        epoch: u64,
 384        version: &clock::Global,
 385        cx: &mut ModelContext<Self>,
 386    ) {
 387        self.channel_states
 388            .entry(channel_id)
 389            .or_insert_with(|| Default::default())
 390            .update_latest_notes_version(epoch, version);
 391        cx.notify()
 392    }
 393
 394    pub fn open_channel_chat(
 395        &mut self,
 396        channel_id: ChannelId,
 397        cx: &mut ModelContext<Self>,
 398    ) -> Task<Result<Model<ChannelChat>>> {
 399        let client = self.client.clone();
 400        let user_store = self.user_store.clone();
 401        let this = cx.handle();
 402        self.open_channel_resource(
 403            channel_id,
 404            |this| &mut this.opened_chats,
 405            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 406            cx,
 407        )
 408    }
 409
 410    /// Asynchronously open a given resource associated with a channel.
 411    ///
 412    /// Make sure that the resource is only opened once, even if this method
 413    /// is called multiple times with the same channel id while the first task
 414    /// is still running.
 415    fn open_channel_resource<T, F, Fut>(
 416        &mut self,
 417        channel_id: ChannelId,
 418        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 419        load: F,
 420        cx: &mut ModelContext<Self>,
 421    ) -> Task<Result<Model<T>>>
 422    where
 423        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 424        Fut: Future<Output = Result<Model<T>>>,
 425        T: 'static,
 426    {
 427        let task = loop {
 428            match get_map(self).entry(channel_id) {
 429                hash_map::Entry::Occupied(e) => match e.get() {
 430                    OpenedModelHandle::Open(model) => {
 431                        if let Some(model) = model.upgrade() {
 432                            break Task::ready(Ok(model)).shared();
 433                        } else {
 434                            get_map(self).remove(&channel_id);
 435                            continue;
 436                        }
 437                    }
 438                    OpenedModelHandle::Loading(task) => {
 439                        break task.clone();
 440                    }
 441                },
 442                hash_map::Entry::Vacant(e) => {
 443                    let task = cx
 444                        .spawn(move |this, mut cx| async move {
 445                            let channel = this.update(&mut cx, |this, _| {
 446                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 447                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 448                                })
 449                            })??;
 450
 451                            load(channel, cx).await.map_err(Arc::new)
 452                        })
 453                        .shared();
 454
 455                    e.insert(OpenedModelHandle::Loading(task.clone()));
 456                    cx.spawn({
 457                        let task = task.clone();
 458                        move |this, mut cx| async move {
 459                            let result = task.await;
 460                            this.update(&mut cx, |this, _| match result {
 461                                Ok(model) => {
 462                                    get_map(this).insert(
 463                                        channel_id,
 464                                        OpenedModelHandle::Open(model.downgrade()),
 465                                    );
 466                                }
 467                                Err(_) => {
 468                                    get_map(this).remove(&channel_id);
 469                                }
 470                            })
 471                            .ok();
 472                        }
 473                    })
 474                    .detach();
 475                    break task;
 476                }
 477            }
 478        };
 479        cx.background_executor()
 480            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 481    }
 482
 483    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 484        self.channel_role(channel_id) == proto::ChannelRole::Admin
 485    }
 486
 487    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 488        self.channel_index
 489            .by_id()
 490            .get(&channel_id)
 491            .map_or(false, |channel| channel.is_root_channel())
 492    }
 493
 494    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 495        self.channel_index
 496            .by_id()
 497            .get(&channel_id)
 498            .map_or(false, |channel| {
 499                channel.visibility == ChannelVisibility::Public
 500            })
 501    }
 502
 503    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 504        match self.channel_role(channel_id) {
 505            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 506            _ => Capability::ReadOnly,
 507        }
 508    }
 509
 510    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 511        maybe!({
 512            let mut channel = self.channel_for_id(channel_id)?;
 513            if !channel.is_root_channel() {
 514                channel = self.channel_for_id(channel.root_id())?;
 515            }
 516            let root_channel_state = self.channel_states.get(&channel.id);
 517            root_channel_state?.role
 518        })
 519        .unwrap_or(proto::ChannelRole::Guest)
 520    }
 521
 522    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 523        self.channel_participants
 524            .get(&channel_id)
 525            .map_or(&[], |v| v.as_slice())
 526    }
 527
 528    pub fn create_channel(
 529        &self,
 530        name: &str,
 531        parent_id: Option<ChannelId>,
 532        cx: &mut ModelContext<Self>,
 533    ) -> Task<Result<ChannelId>> {
 534        let client = self.client.clone();
 535        let name = name.trim_start_matches("#").to_owned();
 536        cx.spawn(move |this, mut cx| async move {
 537            let response = client
 538                .request(proto::CreateChannel { name, parent_id })
 539                .await?;
 540
 541            let channel = response
 542                .channel
 543                .ok_or_else(|| anyhow!("missing channel in response"))?;
 544            let channel_id = channel.id;
 545
 546            this.update(&mut cx, |this, cx| {
 547                let task = this.update_channels(
 548                    proto::UpdateChannels {
 549                        channels: vec![channel],
 550                        ..Default::default()
 551                    },
 552                    cx,
 553                );
 554                assert!(task.is_none());
 555
 556                // This event is emitted because the collab panel wants to clear the pending edit state
 557                // before this frame is rendered. But we can't guarantee that the collab panel's future
 558                // will resolve before this flush_effects finishes. Synchronously emitting this event
 559                // ensures that the collab panel will observe this creation before the frame completes
 560                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 561            })?;
 562
 563            Ok(channel_id)
 564        })
 565    }
 566
 567    pub fn move_channel(
 568        &mut self,
 569        channel_id: ChannelId,
 570        to: ChannelId,
 571        cx: &mut ModelContext<Self>,
 572    ) -> Task<Result<()>> {
 573        let client = self.client.clone();
 574        cx.spawn(move |_, _| async move {
 575            let _ = client
 576                .request(proto::MoveChannel { channel_id, to })
 577                .await?;
 578
 579            Ok(())
 580        })
 581    }
 582
 583    pub fn set_channel_visibility(
 584        &mut self,
 585        channel_id: ChannelId,
 586        visibility: ChannelVisibility,
 587        cx: &mut ModelContext<Self>,
 588    ) -> Task<Result<()>> {
 589        let client = self.client.clone();
 590        cx.spawn(move |_, _| async move {
 591            let _ = client
 592                .request(proto::SetChannelVisibility {
 593                    channel_id,
 594                    visibility: visibility.into(),
 595                })
 596                .await?;
 597
 598            Ok(())
 599        })
 600    }
 601
 602    pub fn invite_member(
 603        &mut self,
 604        channel_id: ChannelId,
 605        user_id: UserId,
 606        role: proto::ChannelRole,
 607        cx: &mut ModelContext<Self>,
 608    ) -> Task<Result<()>> {
 609        if !self.outgoing_invites.insert((channel_id, user_id)) {
 610            return Task::ready(Err(anyhow!("invite request already in progress")));
 611        }
 612
 613        cx.notify();
 614        let client = self.client.clone();
 615        cx.spawn(move |this, mut cx| async move {
 616            let result = client
 617                .request(proto::InviteChannelMember {
 618                    channel_id,
 619                    user_id,
 620                    role: role.into(),
 621                })
 622                .await;
 623
 624            this.update(&mut cx, |this, cx| {
 625                this.outgoing_invites.remove(&(channel_id, user_id));
 626                cx.notify();
 627            })?;
 628
 629            result?;
 630
 631            Ok(())
 632        })
 633    }
 634
 635    pub fn remove_member(
 636        &mut self,
 637        channel_id: ChannelId,
 638        user_id: u64,
 639        cx: &mut ModelContext<Self>,
 640    ) -> Task<Result<()>> {
 641        if !self.outgoing_invites.insert((channel_id, user_id)) {
 642            return Task::ready(Err(anyhow!("invite request already in progress")));
 643        }
 644
 645        cx.notify();
 646        let client = self.client.clone();
 647        cx.spawn(move |this, mut cx| async move {
 648            let result = client
 649                .request(proto::RemoveChannelMember {
 650                    channel_id,
 651                    user_id,
 652                })
 653                .await;
 654
 655            this.update(&mut cx, |this, cx| {
 656                this.outgoing_invites.remove(&(channel_id, user_id));
 657                cx.notify();
 658            })?;
 659            result?;
 660            Ok(())
 661        })
 662    }
 663
 664    pub fn set_member_role(
 665        &mut self,
 666        channel_id: ChannelId,
 667        user_id: UserId,
 668        role: proto::ChannelRole,
 669        cx: &mut ModelContext<Self>,
 670    ) -> Task<Result<()>> {
 671        if !self.outgoing_invites.insert((channel_id, user_id)) {
 672            return Task::ready(Err(anyhow!("member request already in progress")));
 673        }
 674
 675        cx.notify();
 676        let client = self.client.clone();
 677        cx.spawn(move |this, mut cx| async move {
 678            let result = client
 679                .request(proto::SetChannelMemberRole {
 680                    channel_id,
 681                    user_id,
 682                    role: role.into(),
 683                })
 684                .await;
 685
 686            this.update(&mut cx, |this, cx| {
 687                this.outgoing_invites.remove(&(channel_id, user_id));
 688                cx.notify();
 689            })?;
 690
 691            result?;
 692            Ok(())
 693        })
 694    }
 695
 696    pub fn rename(
 697        &mut self,
 698        channel_id: ChannelId,
 699        new_name: &str,
 700        cx: &mut ModelContext<Self>,
 701    ) -> Task<Result<()>> {
 702        let client = self.client.clone();
 703        let name = new_name.to_string();
 704        cx.spawn(move |this, mut cx| async move {
 705            let channel = client
 706                .request(proto::RenameChannel { channel_id, name })
 707                .await?
 708                .channel
 709                .ok_or_else(|| anyhow!("missing channel in response"))?;
 710            this.update(&mut cx, |this, cx| {
 711                let task = this.update_channels(
 712                    proto::UpdateChannels {
 713                        channels: vec![channel],
 714                        ..Default::default()
 715                    },
 716                    cx,
 717                );
 718                assert!(task.is_none());
 719
 720                // This event is emitted because the collab panel wants to clear the pending edit state
 721                // before this frame is rendered. But we can't guarantee that the collab panel's future
 722                // will resolve before this flush_effects finishes. Synchronously emitting this event
 723                // ensures that the collab panel will observe this creation before the frame complete
 724                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 725            })?;
 726            Ok(())
 727        })
 728    }
 729
 730    pub fn respond_to_channel_invite(
 731        &mut self,
 732        channel_id: ChannelId,
 733        accept: bool,
 734        cx: &mut ModelContext<Self>,
 735    ) -> Task<Result<()>> {
 736        let client = self.client.clone();
 737        cx.background_executor().spawn(async move {
 738            client
 739                .request(proto::RespondToChannelInvite { channel_id, accept })
 740                .await?;
 741            Ok(())
 742        })
 743    }
 744
 745    pub fn get_channel_member_details(
 746        &self,
 747        channel_id: ChannelId,
 748        cx: &mut ModelContext<Self>,
 749    ) -> Task<Result<Vec<ChannelMembership>>> {
 750        let client = self.client.clone();
 751        let user_store = self.user_store.downgrade();
 752        cx.spawn(move |_, mut cx| async move {
 753            let response = client
 754                .request(proto::GetChannelMembers { channel_id })
 755                .await?;
 756
 757            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 758            let user_store = user_store
 759                .upgrade()
 760                .ok_or_else(|| anyhow!("user store dropped"))?;
 761            let users = user_store
 762                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 763                .await?;
 764
 765            Ok(users
 766                .into_iter()
 767                .zip(response.members)
 768                .filter_map(|(user, member)| {
 769                    Some(ChannelMembership {
 770                        user,
 771                        role: member.role(),
 772                        kind: member.kind(),
 773                    })
 774                })
 775                .collect())
 776        })
 777    }
 778
 779    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 780        let client = self.client.clone();
 781        async move {
 782            client.request(proto::DeleteChannel { channel_id }).await?;
 783            Ok(())
 784        }
 785    }
 786
 787    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 788        false
 789    }
 790
 791    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 792        self.outgoing_invites.contains(&(channel_id, user_id))
 793    }
 794
 795    async fn handle_update_channels(
 796        this: Model<Self>,
 797        message: TypedEnvelope<proto::UpdateChannels>,
 798        _: Arc<Client>,
 799        mut cx: AsyncAppContext,
 800    ) -> Result<()> {
 801        this.update(&mut cx, |this, _| {
 802            this.update_channels_tx
 803                .unbounded_send(message.payload)
 804                .unwrap();
 805        })?;
 806        Ok(())
 807    }
 808
 809    async fn handle_update_user_channels(
 810        this: Model<Self>,
 811        message: TypedEnvelope<proto::UpdateUserChannels>,
 812        _: Arc<Client>,
 813        mut cx: AsyncAppContext,
 814    ) -> Result<()> {
 815        this.update(&mut cx, |this, cx| {
 816            for buffer_version in message.payload.observed_channel_buffer_version {
 817                let version = language::proto::deserialize_version(&buffer_version.version);
 818                this.acknowledge_notes_version(
 819                    buffer_version.channel_id,
 820                    buffer_version.epoch,
 821                    &version,
 822                    cx,
 823                );
 824            }
 825            for message_id in message.payload.observed_channel_message_id {
 826                this.acknowledge_message_id(message_id.channel_id, message_id.message_id, cx);
 827            }
 828            for membership in message.payload.channel_memberships {
 829                if let Some(role) = ChannelRole::from_i32(membership.role) {
 830                    this.channel_states
 831                        .entry(membership.channel_id)
 832                        .or_insert_with(|| ChannelState::default())
 833                        .set_role(role)
 834                }
 835            }
 836        })
 837    }
 838
 839    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 840        self.channel_index.clear();
 841        self.channel_invitations.clear();
 842        self.channel_participants.clear();
 843        self.channel_index.clear();
 844        self.outgoing_invites.clear();
 845        self.disconnect_channel_buffers_task.take();
 846
 847        for chat in self.opened_chats.values() {
 848            if let OpenedModelHandle::Open(chat) = chat {
 849                if let Some(chat) = chat.upgrade() {
 850                    chat.update(cx, |chat, cx| {
 851                        chat.rejoin(cx);
 852                    });
 853                }
 854            }
 855        }
 856
 857        let mut buffer_versions = Vec::new();
 858        for buffer in self.opened_buffers.values() {
 859            if let OpenedModelHandle::Open(buffer) = buffer {
 860                if let Some(buffer) = buffer.upgrade() {
 861                    let channel_buffer = buffer.read(cx);
 862                    let buffer = channel_buffer.buffer().read(cx);
 863                    buffer_versions.push(proto::ChannelBufferVersion {
 864                        channel_id: channel_buffer.channel_id,
 865                        epoch: channel_buffer.epoch(),
 866                        version: language::proto::serialize_version(&buffer.version()),
 867                    });
 868                }
 869            }
 870        }
 871
 872        if buffer_versions.is_empty() {
 873            return Task::ready(Ok(()));
 874        }
 875
 876        let response = self.client.request(proto::RejoinChannelBuffers {
 877            buffers: buffer_versions,
 878        });
 879
 880        cx.spawn(|this, mut cx| async move {
 881            let mut response = response.await?;
 882
 883            this.update(&mut cx, |this, cx| {
 884                this.opened_buffers.retain(|_, buffer| match buffer {
 885                    OpenedModelHandle::Open(channel_buffer) => {
 886                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 887                            return false;
 888                        };
 889
 890                        channel_buffer.update(cx, |channel_buffer, cx| {
 891                            let channel_id = channel_buffer.channel_id;
 892                            if let Some(remote_buffer) = response
 893                                .buffers
 894                                .iter_mut()
 895                                .find(|buffer| buffer.channel_id == channel_id)
 896                            {
 897                                let channel_id = channel_buffer.channel_id;
 898                                let remote_version =
 899                                    language::proto::deserialize_version(&remote_buffer.version);
 900
 901                                channel_buffer.replace_collaborators(
 902                                    mem::take(&mut remote_buffer.collaborators),
 903                                    cx,
 904                                );
 905
 906                                let operations = channel_buffer
 907                                    .buffer()
 908                                    .update(cx, |buffer, cx| {
 909                                        let outgoing_operations =
 910                                            buffer.serialize_ops(Some(remote_version), cx);
 911                                        let incoming_operations =
 912                                            mem::take(&mut remote_buffer.operations)
 913                                                .into_iter()
 914                                                .map(language::proto::deserialize_operation)
 915                                                .collect::<Result<Vec<_>>>()?;
 916                                        buffer.apply_ops(incoming_operations, cx)?;
 917                                        anyhow::Ok(outgoing_operations)
 918                                    })
 919                                    .log_err();
 920
 921                                if let Some(operations) = operations {
 922                                    let client = this.client.clone();
 923                                    cx.background_executor()
 924                                        .spawn(async move {
 925                                            let operations = operations.await;
 926                                            for chunk in
 927                                                language::proto::split_operations(operations)
 928                                            {
 929                                                client
 930                                                    .send(proto::UpdateChannelBuffer {
 931                                                        channel_id,
 932                                                        operations: chunk,
 933                                                    })
 934                                                    .ok();
 935                                            }
 936                                        })
 937                                        .detach();
 938                                    return true;
 939                                }
 940                            }
 941
 942                            channel_buffer.disconnect(cx);
 943                            false
 944                        })
 945                    }
 946                    OpenedModelHandle::Loading(_) => true,
 947                });
 948            })
 949            .ok();
 950            anyhow::Ok(())
 951        })
 952    }
 953
 954    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
 955        cx.notify();
 956
 957        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 958            cx.spawn(move |this, mut cx| async move {
 959                if wait_for_reconnect {
 960                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
 961                }
 962
 963                if let Some(this) = this.upgrade() {
 964                    this.update(&mut cx, |this, cx| {
 965                        for (_, buffer) in this.opened_buffers.drain() {
 966                            if let OpenedModelHandle::Open(buffer) = buffer {
 967                                if let Some(buffer) = buffer.upgrade() {
 968                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 969                                }
 970                            }
 971                        }
 972                    })
 973                    .ok();
 974                }
 975            })
 976        });
 977    }
 978
 979    pub(crate) fn update_channels(
 980        &mut self,
 981        payload: proto::UpdateChannels,
 982        cx: &mut ModelContext<ChannelStore>,
 983    ) -> Option<Task<Result<()>>> {
 984        if !payload.remove_channel_invitations.is_empty() {
 985            self.channel_invitations
 986                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 987        }
 988        for channel in payload.channel_invitations {
 989            match self
 990                .channel_invitations
 991                .binary_search_by_key(&channel.id, |c| c.id)
 992            {
 993                Ok(ix) => {
 994                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
 995                }
 996                Err(ix) => self.channel_invitations.insert(
 997                    ix,
 998                    Arc::new(Channel {
 999                        id: channel.id,
1000                        visibility: channel.visibility(),
1001                        name: channel.name.into(),
1002                        parent_path: channel.parent_path,
1003                    }),
1004                ),
1005            }
1006        }
1007
1008        let channels_changed = !payload.channels.is_empty()
1009            || !payload.delete_channels.is_empty()
1010            || !payload.latest_channel_message_ids.is_empty()
1011            || !payload.latest_channel_buffer_versions.is_empty();
1012
1013        if channels_changed {
1014            if !payload.delete_channels.is_empty() {
1015                self.channel_index.delete_channels(&payload.delete_channels);
1016                self.channel_participants
1017                    .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
1018
1019                for channel_id in &payload.delete_channels {
1020                    let channel_id = *channel_id;
1021                    if payload
1022                        .channels
1023                        .iter()
1024                        .any(|channel| channel.id == channel_id)
1025                    {
1026                        continue;
1027                    }
1028                    if let Some(OpenedModelHandle::Open(buffer)) =
1029                        self.opened_buffers.remove(&channel_id)
1030                    {
1031                        if let Some(buffer) = buffer.upgrade() {
1032                            buffer.update(cx, ChannelBuffer::disconnect);
1033                        }
1034                    }
1035                }
1036            }
1037
1038            let mut index = self.channel_index.bulk_insert();
1039            for channel in payload.channels {
1040                let id = channel.id;
1041                let channel_changed = index.insert(channel);
1042
1043                if channel_changed {
1044                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1045                        if let Some(buffer) = buffer.upgrade() {
1046                            buffer.update(cx, ChannelBuffer::channel_changed);
1047                        }
1048                    }
1049                }
1050            }
1051
1052            for latest_buffer_version in payload.latest_channel_buffer_versions {
1053                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1054                self.channel_states
1055                    .entry(latest_buffer_version.channel_id)
1056                    .or_default()
1057                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1058            }
1059
1060            for latest_channel_message in payload.latest_channel_message_ids {
1061                self.channel_states
1062                    .entry(latest_channel_message.channel_id)
1063                    .or_default()
1064                    .update_latest_message_id(latest_channel_message.message_id);
1065            }
1066        }
1067
1068        cx.notify();
1069        if payload.channel_participants.is_empty() {
1070            return None;
1071        }
1072
1073        let mut all_user_ids = Vec::new();
1074        let channel_participants = payload.channel_participants;
1075        for entry in &channel_participants {
1076            for user_id in entry.participant_user_ids.iter() {
1077                if let Err(ix) = all_user_ids.binary_search(user_id) {
1078                    all_user_ids.insert(ix, *user_id);
1079                }
1080            }
1081        }
1082
1083        let users = self
1084            .user_store
1085            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1086        Some(cx.spawn(|this, mut cx| async move {
1087            let users = users.await?;
1088
1089            this.update(&mut cx, |this, cx| {
1090                for entry in &channel_participants {
1091                    let mut participants: Vec<_> = entry
1092                        .participant_user_ids
1093                        .iter()
1094                        .filter_map(|user_id| {
1095                            users
1096                                .binary_search_by_key(&user_id, |user| &user.id)
1097                                .ok()
1098                                .map(|ix| users[ix].clone())
1099                        })
1100                        .collect();
1101
1102                    participants.sort_by_key(|u| u.id);
1103
1104                    this.channel_participants
1105                        .insert(entry.channel_id, participants);
1106                }
1107
1108                cx.notify();
1109            })
1110        }))
1111    }
1112}
1113
1114impl ChannelState {
1115    fn set_role(&mut self, role: ChannelRole) {
1116        self.role = Some(role);
1117    }
1118
1119    fn has_channel_buffer_changed(&self) -> bool {
1120        if let Some(latest_version) = &self.latest_notes_versions {
1121            if let Some(observed_version) = &self.observed_notes_versions {
1122                latest_version.epoch > observed_version.epoch
1123                    || latest_version
1124                        .version
1125                        .changed_since(&observed_version.version)
1126            } else {
1127                true
1128            }
1129        } else {
1130            false
1131        }
1132    }
1133
1134    fn has_new_messages(&self) -> bool {
1135        let latest_message_id = self.latest_chat_message;
1136        let observed_message_id = self.observed_chat_message;
1137
1138        latest_message_id.is_some_and(|latest_message_id| {
1139            latest_message_id > observed_message_id.unwrap_or_default()
1140        })
1141    }
1142
1143    fn acknowledge_message_id(&mut self, message_id: u64) {
1144        let observed = self.observed_chat_message.get_or_insert(message_id);
1145        *observed = (*observed).max(message_id);
1146    }
1147
1148    fn update_latest_message_id(&mut self, message_id: u64) {
1149        self.latest_chat_message =
1150            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1151    }
1152
1153    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1154        if let Some(existing) = &mut self.observed_notes_versions {
1155            if existing.epoch == epoch {
1156                existing.version.join(version);
1157                return;
1158            }
1159        }
1160        self.observed_notes_versions = Some(NotesVersion {
1161            epoch,
1162            version: version.clone(),
1163        });
1164    }
1165
1166    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1167        if let Some(existing) = &mut self.latest_notes_versions {
1168            if existing.epoch == epoch {
1169                existing.version.join(version);
1170                return;
1171            }
1172        }
1173        self.latest_notes_versions = Some(NotesVersion {
1174            epoch,
1175            version: version.clone(),
1176        });
1177    }
1178}