channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use db::RELEASE_CHANNEL;
   9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  10use gpui::{
  11    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SharedString, Task,
  12    WeakModel,
  13};
  14use rpc::{
  15    proto::{self, ChannelVisibility},
  16    TypedEnvelope,
  17};
  18use std::{mem, sync::Arc, time::Duration};
  19use util::{async_maybe, ResultExt};
  20
  21pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  22    let channel_store =
  23        cx.build_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  24    cx.set_global(channel_store);
  25}
  26
  27pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  28
  29pub type ChannelId = u64;
  30
  31pub struct ChannelStore {
  32    pub channel_index: ChannelIndex,
  33    channel_invitations: Vec<Arc<Channel>>,
  34    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  35    outgoing_invites: HashSet<(ChannelId, UserId)>,
  36    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  37    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  38    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  39    client: Arc<Client>,
  40    user_store: Model<UserStore>,
  41    _rpc_subscription: Subscription,
  42    _watch_connection_status: Task<Option<()>>,
  43    disconnect_channel_buffers_task: Option<Task<()>>,
  44    _update_channels: Task<()>,
  45}
  46
  47#[derive(Clone, Debug, PartialEq)]
  48pub struct Channel {
  49    pub id: ChannelId,
  50    pub name: SharedString,
  51    pub visibility: proto::ChannelVisibility,
  52    pub role: proto::ChannelRole,
  53    pub unseen_note_version: Option<(u64, clock::Global)>,
  54    pub unseen_message_id: Option<u64>,
  55    pub parent_path: Vec<u64>,
  56}
  57
  58impl Channel {
  59    pub fn link(&self) -> String {
  60        RELEASE_CHANNEL.link_prefix().to_owned()
  61            + "channel/"
  62            + &self.slug()
  63            + "-"
  64            + &self.id.to_string()
  65    }
  66
  67    pub fn slug(&self) -> String {
  68        let slug: String = self
  69            .name
  70            .chars()
  71            .map(|c| if c.is_alphanumeric() { c } else { '-' })
  72            .collect();
  73
  74        slug.trim_matches(|c| c == '-').to_string()
  75    }
  76
  77    pub fn can_edit_notes(&self) -> bool {
  78        self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin
  79    }
  80}
  81
  82pub struct ChannelMembership {
  83    pub user: Arc<User>,
  84    pub kind: proto::channel_member::Kind,
  85    pub role: proto::ChannelRole,
  86}
  87impl ChannelMembership {
  88    pub fn sort_key(&self) -> MembershipSortKey {
  89        MembershipSortKey {
  90            role_order: match self.role {
  91                proto::ChannelRole::Admin => 0,
  92                proto::ChannelRole::Member => 1,
  93                proto::ChannelRole::Banned => 2,
  94                proto::ChannelRole::Guest => 3,
  95            },
  96            kind_order: match self.kind {
  97                proto::channel_member::Kind::Member => 0,
  98                proto::channel_member::Kind::AncestorMember => 1,
  99                proto::channel_member::Kind::Invitee => 2,
 100            },
 101            username_order: self.user.github_login.as_str(),
 102        }
 103    }
 104}
 105
 106#[derive(PartialOrd, Ord, PartialEq, Eq)]
 107pub struct MembershipSortKey<'a> {
 108    role_order: u8,
 109    kind_order: u8,
 110    username_order: &'a str,
 111}
 112
 113pub enum ChannelEvent {
 114    ChannelCreated(ChannelId),
 115    ChannelRenamed(ChannelId),
 116}
 117
 118impl EventEmitter<ChannelEvent> for ChannelStore {}
 119
 120enum OpenedModelHandle<E> {
 121    Open(WeakModel<E>),
 122    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 123}
 124
 125impl ChannelStore {
 126    pub fn global(cx: &AppContext) -> Model<Self> {
 127        cx.global::<Model<Self>>().clone()
 128    }
 129
 130    pub fn new(
 131        client: Arc<Client>,
 132        user_store: Model<UserStore>,
 133        cx: &mut ModelContext<Self>,
 134    ) -> Self {
 135        let rpc_subscription =
 136            client.add_message_handler(cx.weak_model(), Self::handle_update_channels);
 137
 138        let mut connection_status = client.status();
 139        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 140        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 141            while let Some(status) = connection_status.next().await {
 142                let this = this.upgrade()?;
 143                match status {
 144                    client::Status::Connected { .. } => {
 145                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 146                            .ok()?
 147                            .await
 148                            .log_err()?;
 149                    }
 150                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 151                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 152                            .ok();
 153                    }
 154                    _ => {
 155                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 156                            .ok();
 157                    }
 158                }
 159            }
 160            Some(())
 161        });
 162
 163        Self {
 164            channel_invitations: Vec::default(),
 165            channel_index: ChannelIndex::default(),
 166            channel_participants: Default::default(),
 167            outgoing_invites: Default::default(),
 168            opened_buffers: Default::default(),
 169            opened_chats: Default::default(),
 170            update_channels_tx,
 171            client,
 172            user_store,
 173            _rpc_subscription: rpc_subscription,
 174            _watch_connection_status: watch_connection_status,
 175            disconnect_channel_buffers_task: None,
 176            _update_channels: cx.spawn(|this, mut cx| async move {
 177                async_maybe!({
 178                    while let Some(update_channels) = update_channels_rx.next().await {
 179                        if let Some(this) = this.upgrade() {
 180                            let update_task = this.update(&mut cx, |this, cx| {
 181                                this.update_channels(update_channels, cx)
 182                            })?;
 183                            if let Some(update_task) = update_task {
 184                                update_task.await.log_err();
 185                            }
 186                        }
 187                    }
 188                    anyhow::Ok(())
 189                })
 190                .await
 191                .log_err();
 192            }),
 193        }
 194    }
 195
 196    pub fn client(&self) -> Arc<Client> {
 197        self.client.clone()
 198    }
 199
 200    /// Returns the number of unique channels in the store
 201    pub fn channel_count(&self) -> usize {
 202        self.channel_index.by_id().len()
 203    }
 204
 205    /// Returns the index of a channel ID in the list of unique channels
 206    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 207        self.channel_index
 208            .by_id()
 209            .keys()
 210            .position(|id| *id == channel_id)
 211    }
 212
 213    /// Returns an iterator over all unique channels
 214    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 215        self.channel_index.by_id().values()
 216    }
 217
 218    /// Iterate over all entries in the channel DAG
 219    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 220        self.channel_index
 221            .ordered_channels()
 222            .iter()
 223            .filter_map(move |id| {
 224                let channel = self.channel_index.by_id().get(id)?;
 225                Some((channel.parent_path.len(), channel))
 226            })
 227    }
 228
 229    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 230        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 231        self.channel_index.by_id().get(channel_id)
 232    }
 233
 234    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 235        self.channel_index.by_id().values().nth(ix)
 236    }
 237
 238    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 239        self.channel_invitations
 240            .iter()
 241            .any(|channel| channel.id == channel_id)
 242    }
 243
 244    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 245        &self.channel_invitations
 246    }
 247
 248    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 249        self.channel_index.by_id().get(&channel_id)
 250    }
 251
 252    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 253        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 254            if let OpenedModelHandle::Open(buffer) = buffer {
 255                return buffer.upgrade().is_some();
 256            }
 257        }
 258        false
 259    }
 260
 261    pub fn open_channel_buffer(
 262        &mut self,
 263        channel_id: ChannelId,
 264        cx: &mut ModelContext<Self>,
 265    ) -> Task<Result<Model<ChannelBuffer>>> {
 266        let client = self.client.clone();
 267        let user_store = self.user_store.clone();
 268        let channel_store = cx.handle();
 269        self.open_channel_resource(
 270            channel_id,
 271            |this| &mut this.opened_buffers,
 272            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 273            cx,
 274        )
 275    }
 276
 277    pub fn fetch_channel_messages(
 278        &self,
 279        message_ids: Vec<u64>,
 280        cx: &mut ModelContext<Self>,
 281    ) -> Task<Result<Vec<ChannelMessage>>> {
 282        let request = if message_ids.is_empty() {
 283            None
 284        } else {
 285            Some(
 286                self.client
 287                    .request(proto::GetChannelMessagesById { message_ids }),
 288            )
 289        };
 290        cx.spawn(|this, mut cx| async move {
 291            if let Some(request) = request {
 292                let response = request.await?;
 293                let this = this
 294                    .upgrade()
 295                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 296                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 297                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 298            } else {
 299                Ok(Vec::new())
 300            }
 301        })
 302    }
 303
 304    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
 305        self.channel_index
 306            .by_id()
 307            .get(&channel_id)
 308            .map(|channel| channel.unseen_note_version.is_some())
 309    }
 310
 311    pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
 312        self.channel_index
 313            .by_id()
 314            .get(&channel_id)
 315            .map(|channel| channel.unseen_message_id.is_some())
 316    }
 317
 318    pub fn notes_changed(
 319        &mut self,
 320        channel_id: ChannelId,
 321        epoch: u64,
 322        version: &clock::Global,
 323        cx: &mut ModelContext<Self>,
 324    ) {
 325        self.channel_index.note_changed(channel_id, epoch, version);
 326        cx.notify();
 327    }
 328
 329    pub fn new_message(
 330        &mut self,
 331        channel_id: ChannelId,
 332        message_id: u64,
 333        cx: &mut ModelContext<Self>,
 334    ) {
 335        self.channel_index.new_message(channel_id, message_id);
 336        cx.notify();
 337    }
 338
 339    pub fn acknowledge_message_id(
 340        &mut self,
 341        channel_id: ChannelId,
 342        message_id: u64,
 343        cx: &mut ModelContext<Self>,
 344    ) {
 345        self.channel_index
 346            .acknowledge_message_id(channel_id, message_id);
 347        cx.notify();
 348    }
 349
 350    pub fn acknowledge_notes_version(
 351        &mut self,
 352        channel_id: ChannelId,
 353        epoch: u64,
 354        version: &clock::Global,
 355        cx: &mut ModelContext<Self>,
 356    ) {
 357        self.channel_index
 358            .acknowledge_note_version(channel_id, epoch, version);
 359        cx.notify();
 360    }
 361
 362    pub fn open_channel_chat(
 363        &mut self,
 364        channel_id: ChannelId,
 365        cx: &mut ModelContext<Self>,
 366    ) -> Task<Result<Model<ChannelChat>>> {
 367        let client = self.client.clone();
 368        let user_store = self.user_store.clone();
 369        let this = cx.handle();
 370        self.open_channel_resource(
 371            channel_id,
 372            |this| &mut this.opened_chats,
 373            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 374            cx,
 375        )
 376    }
 377
 378    /// Asynchronously open a given resource associated with a channel.
 379    ///
 380    /// Make sure that the resource is only opened once, even if this method
 381    /// is called multiple times with the same channel id while the first task
 382    /// is still running.
 383    fn open_channel_resource<T, F, Fut>(
 384        &mut self,
 385        channel_id: ChannelId,
 386        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 387        load: F,
 388        cx: &mut ModelContext<Self>,
 389    ) -> Task<Result<Model<T>>>
 390    where
 391        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 392        Fut: Future<Output = Result<Model<T>>>,
 393        T: 'static,
 394    {
 395        let task = loop {
 396            match get_map(self).entry(channel_id) {
 397                hash_map::Entry::Occupied(e) => match e.get() {
 398                    OpenedModelHandle::Open(model) => {
 399                        if let Some(model) = model.upgrade() {
 400                            break Task::ready(Ok(model)).shared();
 401                        } else {
 402                            get_map(self).remove(&channel_id);
 403                            continue;
 404                        }
 405                    }
 406                    OpenedModelHandle::Loading(task) => {
 407                        break task.clone();
 408                    }
 409                },
 410                hash_map::Entry::Vacant(e) => {
 411                    let task = cx
 412                        .spawn(move |this, mut cx| async move {
 413                            let channel = this.update(&mut cx, |this, _| {
 414                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 415                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 416                                })
 417                            })??;
 418
 419                            load(channel, cx).await.map_err(Arc::new)
 420                        })
 421                        .shared();
 422
 423                    e.insert(OpenedModelHandle::Loading(task.clone()));
 424                    cx.spawn({
 425                        let task = task.clone();
 426                        move |this, mut cx| async move {
 427                            let result = task.await;
 428                            this.update(&mut cx, |this, _| match result {
 429                                Ok(model) => {
 430                                    get_map(this).insert(
 431                                        channel_id,
 432                                        OpenedModelHandle::Open(model.downgrade()),
 433                                    );
 434                                }
 435                                Err(_) => {
 436                                    get_map(this).remove(&channel_id);
 437                                }
 438                            })
 439                            .ok();
 440                        }
 441                    })
 442                    .detach();
 443                    break task;
 444                }
 445            }
 446        };
 447        cx.background_executor()
 448            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 449    }
 450
 451    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 452        let Some(channel) = self.channel_for_id(channel_id) else {
 453            return false;
 454        };
 455        channel.role == proto::ChannelRole::Admin
 456    }
 457
 458    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 459        self.channel_participants
 460            .get(&channel_id)
 461            .map_or(&[], |v| v.as_slice())
 462    }
 463
 464    pub fn create_channel(
 465        &self,
 466        name: &str,
 467        parent_id: Option<ChannelId>,
 468        cx: &mut ModelContext<Self>,
 469    ) -> Task<Result<ChannelId>> {
 470        let client = self.client.clone();
 471        let name = name.trim_start_matches("#").to_owned();
 472        cx.spawn(move |this, mut cx| async move {
 473            let response = client
 474                .request(proto::CreateChannel { name, parent_id })
 475                .await?;
 476
 477            let channel = response
 478                .channel
 479                .ok_or_else(|| anyhow!("missing channel in response"))?;
 480            let channel_id = channel.id;
 481
 482            this.update(&mut cx, |this, cx| {
 483                let task = this.update_channels(
 484                    proto::UpdateChannels {
 485                        channels: vec![channel],
 486                        ..Default::default()
 487                    },
 488                    cx,
 489                );
 490                assert!(task.is_none());
 491
 492                // This event is emitted because the collab panel wants to clear the pending edit state
 493                // before this frame is rendered. But we can't guarantee that the collab panel's future
 494                // will resolve before this flush_effects finishes. Synchronously emitting this event
 495                // ensures that the collab panel will observe this creation before the frame completes
 496                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 497            })?;
 498
 499            Ok(channel_id)
 500        })
 501    }
 502
 503    pub fn move_channel(
 504        &mut self,
 505        channel_id: ChannelId,
 506        to: Option<ChannelId>,
 507        cx: &mut ModelContext<Self>,
 508    ) -> Task<Result<()>> {
 509        let client = self.client.clone();
 510        cx.spawn(move |_, _| async move {
 511            let _ = client
 512                .request(proto::MoveChannel { channel_id, to })
 513                .await?;
 514
 515            Ok(())
 516        })
 517    }
 518
 519    pub fn set_channel_visibility(
 520        &mut self,
 521        channel_id: ChannelId,
 522        visibility: ChannelVisibility,
 523        cx: &mut ModelContext<Self>,
 524    ) -> Task<Result<()>> {
 525        let client = self.client.clone();
 526        cx.spawn(move |_, _| async move {
 527            let _ = client
 528                .request(proto::SetChannelVisibility {
 529                    channel_id,
 530                    visibility: visibility.into(),
 531                })
 532                .await?;
 533
 534            Ok(())
 535        })
 536    }
 537
 538    pub fn invite_member(
 539        &mut self,
 540        channel_id: ChannelId,
 541        user_id: UserId,
 542        role: proto::ChannelRole,
 543        cx: &mut ModelContext<Self>,
 544    ) -> Task<Result<()>> {
 545        if !self.outgoing_invites.insert((channel_id, user_id)) {
 546            return Task::ready(Err(anyhow!("invite request already in progress")));
 547        }
 548
 549        cx.notify();
 550        let client = self.client.clone();
 551        cx.spawn(move |this, mut cx| async move {
 552            let result = client
 553                .request(proto::InviteChannelMember {
 554                    channel_id,
 555                    user_id,
 556                    role: role.into(),
 557                })
 558                .await;
 559
 560            this.update(&mut cx, |this, cx| {
 561                this.outgoing_invites.remove(&(channel_id, user_id));
 562                cx.notify();
 563            })?;
 564
 565            result?;
 566
 567            Ok(())
 568        })
 569    }
 570
 571    pub fn remove_member(
 572        &mut self,
 573        channel_id: ChannelId,
 574        user_id: u64,
 575        cx: &mut ModelContext<Self>,
 576    ) -> Task<Result<()>> {
 577        if !self.outgoing_invites.insert((channel_id, user_id)) {
 578            return Task::ready(Err(anyhow!("invite request already in progress")));
 579        }
 580
 581        cx.notify();
 582        let client = self.client.clone();
 583        cx.spawn(move |this, mut cx| async move {
 584            let result = client
 585                .request(proto::RemoveChannelMember {
 586                    channel_id,
 587                    user_id,
 588                })
 589                .await;
 590
 591            this.update(&mut cx, |this, cx| {
 592                this.outgoing_invites.remove(&(channel_id, user_id));
 593                cx.notify();
 594            })?;
 595            result?;
 596            Ok(())
 597        })
 598    }
 599
 600    pub fn set_member_role(
 601        &mut self,
 602        channel_id: ChannelId,
 603        user_id: UserId,
 604        role: proto::ChannelRole,
 605        cx: &mut ModelContext<Self>,
 606    ) -> Task<Result<()>> {
 607        if !self.outgoing_invites.insert((channel_id, user_id)) {
 608            return Task::ready(Err(anyhow!("member request already in progress")));
 609        }
 610
 611        cx.notify();
 612        let client = self.client.clone();
 613        cx.spawn(move |this, mut cx| async move {
 614            let result = client
 615                .request(proto::SetChannelMemberRole {
 616                    channel_id,
 617                    user_id,
 618                    role: role.into(),
 619                })
 620                .await;
 621
 622            this.update(&mut cx, |this, cx| {
 623                this.outgoing_invites.remove(&(channel_id, user_id));
 624                cx.notify();
 625            })?;
 626
 627            result?;
 628            Ok(())
 629        })
 630    }
 631
 632    pub fn rename(
 633        &mut self,
 634        channel_id: ChannelId,
 635        new_name: &str,
 636        cx: &mut ModelContext<Self>,
 637    ) -> Task<Result<()>> {
 638        let client = self.client.clone();
 639        let name = new_name.to_string();
 640        cx.spawn(move |this, mut cx| async move {
 641            let channel = client
 642                .request(proto::RenameChannel { channel_id, name })
 643                .await?
 644                .channel
 645                .ok_or_else(|| anyhow!("missing channel in response"))?;
 646            this.update(&mut cx, |this, cx| {
 647                let task = this.update_channels(
 648                    proto::UpdateChannels {
 649                        channels: vec![channel],
 650                        ..Default::default()
 651                    },
 652                    cx,
 653                );
 654                assert!(task.is_none());
 655
 656                // This event is emitted because the collab panel wants to clear the pending edit state
 657                // before this frame is rendered. But we can't guarantee that the collab panel's future
 658                // will resolve before this flush_effects finishes. Synchronously emitting this event
 659                // ensures that the collab panel will observe this creation before the frame complete
 660                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 661            })?;
 662            Ok(())
 663        })
 664    }
 665
 666    pub fn respond_to_channel_invite(
 667        &mut self,
 668        channel_id: ChannelId,
 669        accept: bool,
 670        cx: &mut ModelContext<Self>,
 671    ) -> Task<Result<()>> {
 672        let client = self.client.clone();
 673        cx.background_executor().spawn(async move {
 674            client
 675                .request(proto::RespondToChannelInvite { channel_id, accept })
 676                .await?;
 677            Ok(())
 678        })
 679    }
 680
 681    pub fn get_channel_member_details(
 682        &self,
 683        channel_id: ChannelId,
 684        cx: &mut ModelContext<Self>,
 685    ) -> Task<Result<Vec<ChannelMembership>>> {
 686        let client = self.client.clone();
 687        let user_store = self.user_store.downgrade();
 688        cx.spawn(move |_, mut cx| async move {
 689            let response = client
 690                .request(proto::GetChannelMembers { channel_id })
 691                .await?;
 692
 693            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 694            let user_store = user_store
 695                .upgrade()
 696                .ok_or_else(|| anyhow!("user store dropped"))?;
 697            let users = user_store
 698                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 699                .await?;
 700
 701            Ok(users
 702                .into_iter()
 703                .zip(response.members)
 704                .filter_map(|(user, member)| {
 705                    Some(ChannelMembership {
 706                        user,
 707                        role: member.role(),
 708                        kind: member.kind(),
 709                    })
 710                })
 711                .collect())
 712        })
 713    }
 714
 715    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 716        let client = self.client.clone();
 717        async move {
 718            client.request(proto::DeleteChannel { channel_id }).await?;
 719            Ok(())
 720        }
 721    }
 722
 723    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 724        false
 725    }
 726
 727    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 728        self.outgoing_invites.contains(&(channel_id, user_id))
 729    }
 730
 731    async fn handle_update_channels(
 732        this: Model<Self>,
 733        message: TypedEnvelope<proto::UpdateChannels>,
 734        _: Arc<Client>,
 735        mut cx: AsyncAppContext,
 736    ) -> Result<()> {
 737        this.update(&mut cx, |this, _| {
 738            this.update_channels_tx
 739                .unbounded_send(message.payload)
 740                .unwrap();
 741        })?;
 742        Ok(())
 743    }
 744
 745    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 746        self.channel_index.clear();
 747        self.channel_invitations.clear();
 748        self.channel_participants.clear();
 749        self.channel_index.clear();
 750        self.outgoing_invites.clear();
 751        self.disconnect_channel_buffers_task.take();
 752
 753        for chat in self.opened_chats.values() {
 754            if let OpenedModelHandle::Open(chat) = chat {
 755                if let Some(chat) = chat.upgrade() {
 756                    chat.update(cx, |chat, cx| {
 757                        chat.rejoin(cx);
 758                    });
 759                }
 760            }
 761        }
 762
 763        let mut buffer_versions = Vec::new();
 764        for buffer in self.opened_buffers.values() {
 765            if let OpenedModelHandle::Open(buffer) = buffer {
 766                if let Some(buffer) = buffer.upgrade() {
 767                    let channel_buffer = buffer.read(cx);
 768                    let buffer = channel_buffer.buffer().read(cx);
 769                    buffer_versions.push(proto::ChannelBufferVersion {
 770                        channel_id: channel_buffer.channel_id,
 771                        epoch: channel_buffer.epoch(),
 772                        version: language::proto::serialize_version(&buffer.version()),
 773                    });
 774                }
 775            }
 776        }
 777
 778        if buffer_versions.is_empty() {
 779            return Task::ready(Ok(()));
 780        }
 781
 782        let response = self.client.request(proto::RejoinChannelBuffers {
 783            buffers: buffer_versions,
 784        });
 785
 786        cx.spawn(|this, mut cx| async move {
 787            let mut response = response.await?;
 788
 789            this.update(&mut cx, |this, cx| {
 790                this.opened_buffers.retain(|_, buffer| match buffer {
 791                    OpenedModelHandle::Open(channel_buffer) => {
 792                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 793                            return false;
 794                        };
 795
 796                        channel_buffer.update(cx, |channel_buffer, cx| {
 797                            let channel_id = channel_buffer.channel_id;
 798                            if let Some(remote_buffer) = response
 799                                .buffers
 800                                .iter_mut()
 801                                .find(|buffer| buffer.channel_id == channel_id)
 802                            {
 803                                let channel_id = channel_buffer.channel_id;
 804                                let remote_version =
 805                                    language::proto::deserialize_version(&remote_buffer.version);
 806
 807                                channel_buffer.replace_collaborators(
 808                                    mem::take(&mut remote_buffer.collaborators),
 809                                    cx,
 810                                );
 811
 812                                let operations = channel_buffer
 813                                    .buffer()
 814                                    .update(cx, |buffer, cx| {
 815                                        let outgoing_operations =
 816                                            buffer.serialize_ops(Some(remote_version), cx);
 817                                        let incoming_operations =
 818                                            mem::take(&mut remote_buffer.operations)
 819                                                .into_iter()
 820                                                .map(language::proto::deserialize_operation)
 821                                                .collect::<Result<Vec<_>>>()?;
 822                                        buffer.apply_ops(incoming_operations, cx)?;
 823                                        anyhow::Ok(outgoing_operations)
 824                                    })
 825                                    .log_err();
 826
 827                                if let Some(operations) = operations {
 828                                    let client = this.client.clone();
 829                                    cx.background_executor()
 830                                        .spawn(async move {
 831                                            let operations = operations.await;
 832                                            for chunk in
 833                                                language::proto::split_operations(operations)
 834                                            {
 835                                                client
 836                                                    .send(proto::UpdateChannelBuffer {
 837                                                        channel_id,
 838                                                        operations: chunk,
 839                                                    })
 840                                                    .ok();
 841                                            }
 842                                        })
 843                                        .detach();
 844                                    return true;
 845                                }
 846                            }
 847
 848                            channel_buffer.disconnect(cx);
 849                            false
 850                        })
 851                    }
 852                    OpenedModelHandle::Loading(_) => true,
 853                });
 854            })
 855            .ok();
 856            anyhow::Ok(())
 857        })
 858    }
 859
 860    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
 861        cx.notify();
 862
 863        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 864            cx.spawn(move |this, mut cx| async move {
 865                if wait_for_reconnect {
 866                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
 867                }
 868
 869                if let Some(this) = this.upgrade() {
 870                    this.update(&mut cx, |this, cx| {
 871                        for (_, buffer) in this.opened_buffers.drain() {
 872                            if let OpenedModelHandle::Open(buffer) = buffer {
 873                                if let Some(buffer) = buffer.upgrade() {
 874                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 875                                }
 876                            }
 877                        }
 878                    })
 879                    .ok();
 880                }
 881            })
 882        });
 883    }
 884
 885    pub(crate) fn update_channels(
 886        &mut self,
 887        payload: proto::UpdateChannels,
 888        cx: &mut ModelContext<ChannelStore>,
 889    ) -> Option<Task<Result<()>>> {
 890        if !payload.remove_channel_invitations.is_empty() {
 891            self.channel_invitations
 892                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 893        }
 894        for channel in payload.channel_invitations {
 895            match self
 896                .channel_invitations
 897                .binary_search_by_key(&channel.id, |c| c.id)
 898            {
 899                Ok(ix) => {
 900                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
 901                }
 902                Err(ix) => self.channel_invitations.insert(
 903                    ix,
 904                    Arc::new(Channel {
 905                        id: channel.id,
 906                        visibility: channel.visibility(),
 907                        role: channel.role(),
 908                        name: channel.name.into(),
 909                        unseen_note_version: None,
 910                        unseen_message_id: None,
 911                        parent_path: channel.parent_path,
 912                    }),
 913                ),
 914            }
 915        }
 916
 917        let channels_changed = !payload.channels.is_empty()
 918            || !payload.delete_channels.is_empty()
 919            || !payload.unseen_channel_messages.is_empty()
 920            || !payload.unseen_channel_buffer_changes.is_empty();
 921
 922        if channels_changed {
 923            if !payload.delete_channels.is_empty() {
 924                self.channel_index.delete_channels(&payload.delete_channels);
 925                self.channel_participants
 926                    .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
 927
 928                for channel_id in &payload.delete_channels {
 929                    let channel_id = *channel_id;
 930                    if payload
 931                        .channels
 932                        .iter()
 933                        .any(|channel| channel.id == channel_id)
 934                    {
 935                        continue;
 936                    }
 937                    if let Some(OpenedModelHandle::Open(buffer)) =
 938                        self.opened_buffers.remove(&channel_id)
 939                    {
 940                        if let Some(buffer) = buffer.upgrade() {
 941                            buffer.update(cx, ChannelBuffer::disconnect);
 942                        }
 943                    }
 944                }
 945            }
 946
 947            let mut index = self.channel_index.bulk_insert();
 948            for channel in payload.channels {
 949                let id = channel.id;
 950                let channel_changed = index.insert(channel);
 951
 952                if channel_changed {
 953                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
 954                        if let Some(buffer) = buffer.upgrade() {
 955                            buffer.update(cx, ChannelBuffer::channel_changed);
 956                        }
 957                    }
 958                }
 959            }
 960
 961            for unseen_buffer_change in payload.unseen_channel_buffer_changes {
 962                let version = language::proto::deserialize_version(&unseen_buffer_change.version);
 963                index.note_changed(
 964                    unseen_buffer_change.channel_id,
 965                    unseen_buffer_change.epoch,
 966                    &version,
 967                );
 968            }
 969
 970            for unseen_channel_message in payload.unseen_channel_messages {
 971                index.new_messages(
 972                    unseen_channel_message.channel_id,
 973                    unseen_channel_message.message_id,
 974                );
 975            }
 976        }
 977
 978        cx.notify();
 979        if payload.channel_participants.is_empty() {
 980            return None;
 981        }
 982
 983        let mut all_user_ids = Vec::new();
 984        let channel_participants = payload.channel_participants;
 985        for entry in &channel_participants {
 986            for user_id in entry.participant_user_ids.iter() {
 987                if let Err(ix) = all_user_ids.binary_search(user_id) {
 988                    all_user_ids.insert(ix, *user_id);
 989                }
 990            }
 991        }
 992
 993        let users = self
 994            .user_store
 995            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
 996        Some(cx.spawn(|this, mut cx| async move {
 997            let users = users.await?;
 998
 999            this.update(&mut cx, |this, cx| {
1000                for entry in &channel_participants {
1001                    let mut participants: Vec<_> = entry
1002                        .participant_user_ids
1003                        .iter()
1004                        .filter_map(|user_id| {
1005                            users
1006                                .binary_search_by_key(&user_id, |user| &user.id)
1007                                .ok()
1008                                .map(|ix| users[ix].clone())
1009                        })
1010                        .collect();
1011
1012                    participants.sort_by_key(|u| u.id);
1013
1014                    this.channel_participants
1015                        .insert(entry.channel_id, participants);
1016                }
1017
1018                cx.notify();
1019            })
1020        }))
1021    }
1022}