channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use db::RELEASE_CHANNEL;
   9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  10use gpui::{
  11    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SharedString, Task,
  12    WeakModel,
  13};
  14use language::Capability;
  15use rpc::{
  16    proto::{self, ChannelVisibility},
  17    TypedEnvelope,
  18};
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{async_maybe, ResultExt};
  21
  22pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  23    let channel_store =
  24        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  25    cx.set_global(channel_store);
  26}
  27
  28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  29
  30pub type ChannelId = u64;
  31
  32pub struct ChannelStore {
  33    pub channel_index: ChannelIndex,
  34    channel_invitations: Vec<Arc<Channel>>,
  35    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  36    outgoing_invites: HashSet<(ChannelId, UserId)>,
  37    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  38    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  39    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  40    client: Arc<Client>,
  41    user_store: Model<UserStore>,
  42    _rpc_subscription: Subscription,
  43    _watch_connection_status: Task<Option<()>>,
  44    disconnect_channel_buffers_task: Option<Task<()>>,
  45    _update_channels: Task<()>,
  46}
  47
  48#[derive(Clone, Debug, PartialEq)]
  49pub struct Channel {
  50    pub id: ChannelId,
  51    pub name: SharedString,
  52    pub visibility: proto::ChannelVisibility,
  53    pub role: proto::ChannelRole,
  54    pub unseen_note_version: Option<(u64, clock::Global)>,
  55    pub unseen_message_id: Option<u64>,
  56    pub parent_path: Vec<u64>,
  57}
  58
  59impl Channel {
  60    pub fn link(&self) -> String {
  61        RELEASE_CHANNEL.link_prefix().to_owned()
  62            + "channel/"
  63            + &self.slug()
  64            + "-"
  65            + &self.id.to_string()
  66    }
  67
  68    pub fn slug(&self) -> String {
  69        let slug: String = self
  70            .name
  71            .chars()
  72            .map(|c| if c.is_alphanumeric() { c } else { '-' })
  73            .collect();
  74
  75        slug.trim_matches(|c| c == '-').to_string()
  76    }
  77
  78    pub fn channel_buffer_capability(&self) -> Capability {
  79        if self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin {
  80            Capability::ReadWrite
  81        } else {
  82            Capability::ReadOnly
  83        }
  84    }
  85}
  86
  87pub struct ChannelMembership {
  88    pub user: Arc<User>,
  89    pub kind: proto::channel_member::Kind,
  90    pub role: proto::ChannelRole,
  91}
  92impl ChannelMembership {
  93    pub fn sort_key(&self) -> MembershipSortKey {
  94        MembershipSortKey {
  95            role_order: match self.role {
  96                proto::ChannelRole::Admin => 0,
  97                proto::ChannelRole::Member => 1,
  98                proto::ChannelRole::Banned => 2,
  99                proto::ChannelRole::Guest => 3,
 100            },
 101            kind_order: match self.kind {
 102                proto::channel_member::Kind::Member => 0,
 103                proto::channel_member::Kind::AncestorMember => 1,
 104                proto::channel_member::Kind::Invitee => 2,
 105            },
 106            username_order: self.user.github_login.as_str(),
 107        }
 108    }
 109}
 110
 111#[derive(PartialOrd, Ord, PartialEq, Eq)]
 112pub struct MembershipSortKey<'a> {
 113    role_order: u8,
 114    kind_order: u8,
 115    username_order: &'a str,
 116}
 117
 118pub enum ChannelEvent {
 119    ChannelCreated(ChannelId),
 120    ChannelRenamed(ChannelId),
 121}
 122
 123impl EventEmitter<ChannelEvent> for ChannelStore {}
 124
 125enum OpenedModelHandle<E> {
 126    Open(WeakModel<E>),
 127    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 128}
 129
 130impl ChannelStore {
 131    pub fn global(cx: &AppContext) -> Model<Self> {
 132        cx.global::<Model<Self>>().clone()
 133    }
 134
 135    pub fn new(
 136        client: Arc<Client>,
 137        user_store: Model<UserStore>,
 138        cx: &mut ModelContext<Self>,
 139    ) -> Self {
 140        let rpc_subscription =
 141            client.add_message_handler(cx.weak_model(), Self::handle_update_channels);
 142
 143        let mut connection_status = client.status();
 144        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 145        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 146            while let Some(status) = connection_status.next().await {
 147                let this = this.upgrade()?;
 148                match status {
 149                    client::Status::Connected { .. } => {
 150                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 151                            .ok()?
 152                            .await
 153                            .log_err()?;
 154                    }
 155                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 156                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 157                            .ok();
 158                    }
 159                    _ => {
 160                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 161                            .ok();
 162                    }
 163                }
 164            }
 165            Some(())
 166        });
 167
 168        Self {
 169            channel_invitations: Vec::default(),
 170            channel_index: ChannelIndex::default(),
 171            channel_participants: Default::default(),
 172            outgoing_invites: Default::default(),
 173            opened_buffers: Default::default(),
 174            opened_chats: Default::default(),
 175            update_channels_tx,
 176            client,
 177            user_store,
 178            _rpc_subscription: rpc_subscription,
 179            _watch_connection_status: watch_connection_status,
 180            disconnect_channel_buffers_task: None,
 181            _update_channels: cx.spawn(|this, mut cx| async move {
 182                async_maybe!({
 183                    while let Some(update_channels) = update_channels_rx.next().await {
 184                        if let Some(this) = this.upgrade() {
 185                            let update_task = this.update(&mut cx, |this, cx| {
 186                                this.update_channels(update_channels, cx)
 187                            })?;
 188                            if let Some(update_task) = update_task {
 189                                update_task.await.log_err();
 190                            }
 191                        }
 192                    }
 193                    anyhow::Ok(())
 194                })
 195                .await
 196                .log_err();
 197            }),
 198        }
 199    }
 200
 201    pub fn client(&self) -> Arc<Client> {
 202        self.client.clone()
 203    }
 204
 205    /// Returns the number of unique channels in the store
 206    pub fn channel_count(&self) -> usize {
 207        self.channel_index.by_id().len()
 208    }
 209
 210    /// Returns the index of a channel ID in the list of unique channels
 211    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 212        self.channel_index
 213            .by_id()
 214            .keys()
 215            .position(|id| *id == channel_id)
 216    }
 217
 218    /// Returns an iterator over all unique channels
 219    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 220        self.channel_index.by_id().values()
 221    }
 222
 223    /// Iterate over all entries in the channel DAG
 224    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 225        self.channel_index
 226            .ordered_channels()
 227            .iter()
 228            .filter_map(move |id| {
 229                let channel = self.channel_index.by_id().get(id)?;
 230                Some((channel.parent_path.len(), channel))
 231            })
 232    }
 233
 234    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 235        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 236        self.channel_index.by_id().get(channel_id)
 237    }
 238
 239    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 240        self.channel_index.by_id().values().nth(ix)
 241    }
 242
 243    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 244        self.channel_invitations
 245            .iter()
 246            .any(|channel| channel.id == channel_id)
 247    }
 248
 249    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 250        &self.channel_invitations
 251    }
 252
 253    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 254        self.channel_index.by_id().get(&channel_id)
 255    }
 256
 257    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 258        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 259            if let OpenedModelHandle::Open(buffer) = buffer {
 260                return buffer.upgrade().is_some();
 261            }
 262        }
 263        false
 264    }
 265
 266    pub fn open_channel_buffer(
 267        &mut self,
 268        channel_id: ChannelId,
 269        cx: &mut ModelContext<Self>,
 270    ) -> Task<Result<Model<ChannelBuffer>>> {
 271        let client = self.client.clone();
 272        let user_store = self.user_store.clone();
 273        let channel_store = cx.handle();
 274        self.open_channel_resource(
 275            channel_id,
 276            |this| &mut this.opened_buffers,
 277            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 278            cx,
 279        )
 280    }
 281
 282    pub fn fetch_channel_messages(
 283        &self,
 284        message_ids: Vec<u64>,
 285        cx: &mut ModelContext<Self>,
 286    ) -> Task<Result<Vec<ChannelMessage>>> {
 287        let request = if message_ids.is_empty() {
 288            None
 289        } else {
 290            Some(
 291                self.client
 292                    .request(proto::GetChannelMessagesById { message_ids }),
 293            )
 294        };
 295        cx.spawn(|this, mut cx| async move {
 296            if let Some(request) = request {
 297                let response = request.await?;
 298                let this = this
 299                    .upgrade()
 300                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 301                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 302                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 303            } else {
 304                Ok(Vec::new())
 305            }
 306        })
 307    }
 308
 309    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
 310        self.channel_index
 311            .by_id()
 312            .get(&channel_id)
 313            .map(|channel| channel.unseen_note_version.is_some())
 314    }
 315
 316    pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
 317        self.channel_index
 318            .by_id()
 319            .get(&channel_id)
 320            .map(|channel| channel.unseen_message_id.is_some())
 321    }
 322
 323    pub fn notes_changed(
 324        &mut self,
 325        channel_id: ChannelId,
 326        epoch: u64,
 327        version: &clock::Global,
 328        cx: &mut ModelContext<Self>,
 329    ) {
 330        self.channel_index.note_changed(channel_id, epoch, version);
 331        cx.notify();
 332    }
 333
 334    pub fn new_message(
 335        &mut self,
 336        channel_id: ChannelId,
 337        message_id: u64,
 338        cx: &mut ModelContext<Self>,
 339    ) {
 340        self.channel_index.new_message(channel_id, message_id);
 341        cx.notify();
 342    }
 343
 344    pub fn acknowledge_message_id(
 345        &mut self,
 346        channel_id: ChannelId,
 347        message_id: u64,
 348        cx: &mut ModelContext<Self>,
 349    ) {
 350        self.channel_index
 351            .acknowledge_message_id(channel_id, message_id);
 352        cx.notify();
 353    }
 354
 355    pub fn acknowledge_notes_version(
 356        &mut self,
 357        channel_id: ChannelId,
 358        epoch: u64,
 359        version: &clock::Global,
 360        cx: &mut ModelContext<Self>,
 361    ) {
 362        self.channel_index
 363            .acknowledge_note_version(channel_id, epoch, version);
 364        cx.notify();
 365    }
 366
 367    pub fn open_channel_chat(
 368        &mut self,
 369        channel_id: ChannelId,
 370        cx: &mut ModelContext<Self>,
 371    ) -> Task<Result<Model<ChannelChat>>> {
 372        let client = self.client.clone();
 373        let user_store = self.user_store.clone();
 374        let this = cx.handle();
 375        self.open_channel_resource(
 376            channel_id,
 377            |this| &mut this.opened_chats,
 378            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 379            cx,
 380        )
 381    }
 382
 383    /// Asynchronously open a given resource associated with a channel.
 384    ///
 385    /// Make sure that the resource is only opened once, even if this method
 386    /// is called multiple times with the same channel id while the first task
 387    /// is still running.
 388    fn open_channel_resource<T, F, Fut>(
 389        &mut self,
 390        channel_id: ChannelId,
 391        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 392        load: F,
 393        cx: &mut ModelContext<Self>,
 394    ) -> Task<Result<Model<T>>>
 395    where
 396        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 397        Fut: Future<Output = Result<Model<T>>>,
 398        T: 'static,
 399    {
 400        let task = loop {
 401            match get_map(self).entry(channel_id) {
 402                hash_map::Entry::Occupied(e) => match e.get() {
 403                    OpenedModelHandle::Open(model) => {
 404                        if let Some(model) = model.upgrade() {
 405                            break Task::ready(Ok(model)).shared();
 406                        } else {
 407                            get_map(self).remove(&channel_id);
 408                            continue;
 409                        }
 410                    }
 411                    OpenedModelHandle::Loading(task) => {
 412                        break task.clone();
 413                    }
 414                },
 415                hash_map::Entry::Vacant(e) => {
 416                    let task = cx
 417                        .spawn(move |this, mut cx| async move {
 418                            let channel = this.update(&mut cx, |this, _| {
 419                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 420                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 421                                })
 422                            })??;
 423
 424                            load(channel, cx).await.map_err(Arc::new)
 425                        })
 426                        .shared();
 427
 428                    e.insert(OpenedModelHandle::Loading(task.clone()));
 429                    cx.spawn({
 430                        let task = task.clone();
 431                        move |this, mut cx| async move {
 432                            let result = task.await;
 433                            this.update(&mut cx, |this, _| match result {
 434                                Ok(model) => {
 435                                    get_map(this).insert(
 436                                        channel_id,
 437                                        OpenedModelHandle::Open(model.downgrade()),
 438                                    );
 439                                }
 440                                Err(_) => {
 441                                    get_map(this).remove(&channel_id);
 442                                }
 443                            })
 444                            .ok();
 445                        }
 446                    })
 447                    .detach();
 448                    break task;
 449                }
 450            }
 451        };
 452        cx.background_executor()
 453            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 454    }
 455
 456    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 457        let Some(channel) = self.channel_for_id(channel_id) else {
 458            return false;
 459        };
 460        channel.role == proto::ChannelRole::Admin
 461    }
 462
 463    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 464        self.channel_participants
 465            .get(&channel_id)
 466            .map_or(&[], |v| v.as_slice())
 467    }
 468
 469    pub fn create_channel(
 470        &self,
 471        name: &str,
 472        parent_id: Option<ChannelId>,
 473        cx: &mut ModelContext<Self>,
 474    ) -> Task<Result<ChannelId>> {
 475        let client = self.client.clone();
 476        let name = name.trim_start_matches("#").to_owned();
 477        cx.spawn(move |this, mut cx| async move {
 478            let response = client
 479                .request(proto::CreateChannel { name, parent_id })
 480                .await?;
 481
 482            let channel = response
 483                .channel
 484                .ok_or_else(|| anyhow!("missing channel in response"))?;
 485            let channel_id = channel.id;
 486
 487            this.update(&mut cx, |this, cx| {
 488                let task = this.update_channels(
 489                    proto::UpdateChannels {
 490                        channels: vec![channel],
 491                        ..Default::default()
 492                    },
 493                    cx,
 494                );
 495                assert!(task.is_none());
 496
 497                // This event is emitted because the collab panel wants to clear the pending edit state
 498                // before this frame is rendered. But we can't guarantee that the collab panel's future
 499                // will resolve before this flush_effects finishes. Synchronously emitting this event
 500                // ensures that the collab panel will observe this creation before the frame completes
 501                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 502            })?;
 503
 504            Ok(channel_id)
 505        })
 506    }
 507
 508    pub fn move_channel(
 509        &mut self,
 510        channel_id: ChannelId,
 511        to: Option<ChannelId>,
 512        cx: &mut ModelContext<Self>,
 513    ) -> Task<Result<()>> {
 514        let client = self.client.clone();
 515        cx.spawn(move |_, _| async move {
 516            let _ = client
 517                .request(proto::MoveChannel { channel_id, to })
 518                .await?;
 519
 520            Ok(())
 521        })
 522    }
 523
 524    pub fn set_channel_visibility(
 525        &mut self,
 526        channel_id: ChannelId,
 527        visibility: ChannelVisibility,
 528        cx: &mut ModelContext<Self>,
 529    ) -> Task<Result<()>> {
 530        let client = self.client.clone();
 531        cx.spawn(move |_, _| async move {
 532            let _ = client
 533                .request(proto::SetChannelVisibility {
 534                    channel_id,
 535                    visibility: visibility.into(),
 536                })
 537                .await?;
 538
 539            Ok(())
 540        })
 541    }
 542
 543    pub fn invite_member(
 544        &mut self,
 545        channel_id: ChannelId,
 546        user_id: UserId,
 547        role: proto::ChannelRole,
 548        cx: &mut ModelContext<Self>,
 549    ) -> Task<Result<()>> {
 550        if !self.outgoing_invites.insert((channel_id, user_id)) {
 551            return Task::ready(Err(anyhow!("invite request already in progress")));
 552        }
 553
 554        cx.notify();
 555        let client = self.client.clone();
 556        cx.spawn(move |this, mut cx| async move {
 557            let result = client
 558                .request(proto::InviteChannelMember {
 559                    channel_id,
 560                    user_id,
 561                    role: role.into(),
 562                })
 563                .await;
 564
 565            this.update(&mut cx, |this, cx| {
 566                this.outgoing_invites.remove(&(channel_id, user_id));
 567                cx.notify();
 568            })?;
 569
 570            result?;
 571
 572            Ok(())
 573        })
 574    }
 575
 576    pub fn remove_member(
 577        &mut self,
 578        channel_id: ChannelId,
 579        user_id: u64,
 580        cx: &mut ModelContext<Self>,
 581    ) -> Task<Result<()>> {
 582        if !self.outgoing_invites.insert((channel_id, user_id)) {
 583            return Task::ready(Err(anyhow!("invite request already in progress")));
 584        }
 585
 586        cx.notify();
 587        let client = self.client.clone();
 588        cx.spawn(move |this, mut cx| async move {
 589            let result = client
 590                .request(proto::RemoveChannelMember {
 591                    channel_id,
 592                    user_id,
 593                })
 594                .await;
 595
 596            this.update(&mut cx, |this, cx| {
 597                this.outgoing_invites.remove(&(channel_id, user_id));
 598                cx.notify();
 599            })?;
 600            result?;
 601            Ok(())
 602        })
 603    }
 604
 605    pub fn set_member_role(
 606        &mut self,
 607        channel_id: ChannelId,
 608        user_id: UserId,
 609        role: proto::ChannelRole,
 610        cx: &mut ModelContext<Self>,
 611    ) -> Task<Result<()>> {
 612        if !self.outgoing_invites.insert((channel_id, user_id)) {
 613            return Task::ready(Err(anyhow!("member request already in progress")));
 614        }
 615
 616        cx.notify();
 617        let client = self.client.clone();
 618        cx.spawn(move |this, mut cx| async move {
 619            let result = client
 620                .request(proto::SetChannelMemberRole {
 621                    channel_id,
 622                    user_id,
 623                    role: role.into(),
 624                })
 625                .await;
 626
 627            this.update(&mut cx, |this, cx| {
 628                this.outgoing_invites.remove(&(channel_id, user_id));
 629                cx.notify();
 630            })?;
 631
 632            result?;
 633            Ok(())
 634        })
 635    }
 636
 637    pub fn rename(
 638        &mut self,
 639        channel_id: ChannelId,
 640        new_name: &str,
 641        cx: &mut ModelContext<Self>,
 642    ) -> Task<Result<()>> {
 643        let client = self.client.clone();
 644        let name = new_name.to_string();
 645        cx.spawn(move |this, mut cx| async move {
 646            let channel = client
 647                .request(proto::RenameChannel { channel_id, name })
 648                .await?
 649                .channel
 650                .ok_or_else(|| anyhow!("missing channel in response"))?;
 651            this.update(&mut cx, |this, cx| {
 652                let task = this.update_channels(
 653                    proto::UpdateChannels {
 654                        channels: vec![channel],
 655                        ..Default::default()
 656                    },
 657                    cx,
 658                );
 659                assert!(task.is_none());
 660
 661                // This event is emitted because the collab panel wants to clear the pending edit state
 662                // before this frame is rendered. But we can't guarantee that the collab panel's future
 663                // will resolve before this flush_effects finishes. Synchronously emitting this event
 664                // ensures that the collab panel will observe this creation before the frame complete
 665                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 666            })?;
 667            Ok(())
 668        })
 669    }
 670
 671    pub fn respond_to_channel_invite(
 672        &mut self,
 673        channel_id: ChannelId,
 674        accept: bool,
 675        cx: &mut ModelContext<Self>,
 676    ) -> Task<Result<()>> {
 677        let client = self.client.clone();
 678        cx.background_executor().spawn(async move {
 679            client
 680                .request(proto::RespondToChannelInvite { channel_id, accept })
 681                .await?;
 682            Ok(())
 683        })
 684    }
 685
 686    pub fn get_channel_member_details(
 687        &self,
 688        channel_id: ChannelId,
 689        cx: &mut ModelContext<Self>,
 690    ) -> Task<Result<Vec<ChannelMembership>>> {
 691        let client = self.client.clone();
 692        let user_store = self.user_store.downgrade();
 693        cx.spawn(move |_, mut cx| async move {
 694            let response = client
 695                .request(proto::GetChannelMembers { channel_id })
 696                .await?;
 697
 698            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 699            let user_store = user_store
 700                .upgrade()
 701                .ok_or_else(|| anyhow!("user store dropped"))?;
 702            let users = user_store
 703                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 704                .await?;
 705
 706            Ok(users
 707                .into_iter()
 708                .zip(response.members)
 709                .filter_map(|(user, member)| {
 710                    Some(ChannelMembership {
 711                        user,
 712                        role: member.role(),
 713                        kind: member.kind(),
 714                    })
 715                })
 716                .collect())
 717        })
 718    }
 719
 720    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 721        let client = self.client.clone();
 722        async move {
 723            client.request(proto::DeleteChannel { channel_id }).await?;
 724            Ok(())
 725        }
 726    }
 727
 728    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 729        false
 730    }
 731
 732    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 733        self.outgoing_invites.contains(&(channel_id, user_id))
 734    }
 735
 736    async fn handle_update_channels(
 737        this: Model<Self>,
 738        message: TypedEnvelope<proto::UpdateChannels>,
 739        _: Arc<Client>,
 740        mut cx: AsyncAppContext,
 741    ) -> Result<()> {
 742        this.update(&mut cx, |this, _| {
 743            this.update_channels_tx
 744                .unbounded_send(message.payload)
 745                .unwrap();
 746        })?;
 747        Ok(())
 748    }
 749
 750    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 751        self.channel_index.clear();
 752        self.channel_invitations.clear();
 753        self.channel_participants.clear();
 754        self.channel_index.clear();
 755        self.outgoing_invites.clear();
 756        self.disconnect_channel_buffers_task.take();
 757
 758        for chat in self.opened_chats.values() {
 759            if let OpenedModelHandle::Open(chat) = chat {
 760                if let Some(chat) = chat.upgrade() {
 761                    chat.update(cx, |chat, cx| {
 762                        chat.rejoin(cx);
 763                    });
 764                }
 765            }
 766        }
 767
 768        let mut buffer_versions = Vec::new();
 769        for buffer in self.opened_buffers.values() {
 770            if let OpenedModelHandle::Open(buffer) = buffer {
 771                if let Some(buffer) = buffer.upgrade() {
 772                    let channel_buffer = buffer.read(cx);
 773                    let buffer = channel_buffer.buffer().read(cx);
 774                    buffer_versions.push(proto::ChannelBufferVersion {
 775                        channel_id: channel_buffer.channel_id,
 776                        epoch: channel_buffer.epoch(),
 777                        version: language::proto::serialize_version(&buffer.version()),
 778                    });
 779                }
 780            }
 781        }
 782
 783        if buffer_versions.is_empty() {
 784            return Task::ready(Ok(()));
 785        }
 786
 787        let response = self.client.request(proto::RejoinChannelBuffers {
 788            buffers: buffer_versions,
 789        });
 790
 791        cx.spawn(|this, mut cx| async move {
 792            let mut response = response.await?;
 793
 794            this.update(&mut cx, |this, cx| {
 795                this.opened_buffers.retain(|_, buffer| match buffer {
 796                    OpenedModelHandle::Open(channel_buffer) => {
 797                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 798                            return false;
 799                        };
 800
 801                        channel_buffer.update(cx, |channel_buffer, cx| {
 802                            let channel_id = channel_buffer.channel_id;
 803                            if let Some(remote_buffer) = response
 804                                .buffers
 805                                .iter_mut()
 806                                .find(|buffer| buffer.channel_id == channel_id)
 807                            {
 808                                let channel_id = channel_buffer.channel_id;
 809                                let remote_version =
 810                                    language::proto::deserialize_version(&remote_buffer.version);
 811
 812                                channel_buffer.replace_collaborators(
 813                                    mem::take(&mut remote_buffer.collaborators),
 814                                    cx,
 815                                );
 816
 817                                let operations = channel_buffer
 818                                    .buffer()
 819                                    .update(cx, |buffer, cx| {
 820                                        let outgoing_operations =
 821                                            buffer.serialize_ops(Some(remote_version), cx);
 822                                        let incoming_operations =
 823                                            mem::take(&mut remote_buffer.operations)
 824                                                .into_iter()
 825                                                .map(language::proto::deserialize_operation)
 826                                                .collect::<Result<Vec<_>>>()?;
 827                                        buffer.apply_ops(incoming_operations, cx)?;
 828                                        anyhow::Ok(outgoing_operations)
 829                                    })
 830                                    .log_err();
 831
 832                                if let Some(operations) = operations {
 833                                    let client = this.client.clone();
 834                                    cx.background_executor()
 835                                        .spawn(async move {
 836                                            let operations = operations.await;
 837                                            for chunk in
 838                                                language::proto::split_operations(operations)
 839                                            {
 840                                                client
 841                                                    .send(proto::UpdateChannelBuffer {
 842                                                        channel_id,
 843                                                        operations: chunk,
 844                                                    })
 845                                                    .ok();
 846                                            }
 847                                        })
 848                                        .detach();
 849                                    return true;
 850                                }
 851                            }
 852
 853                            channel_buffer.disconnect(cx);
 854                            false
 855                        })
 856                    }
 857                    OpenedModelHandle::Loading(_) => true,
 858                });
 859            })
 860            .ok();
 861            anyhow::Ok(())
 862        })
 863    }
 864
 865    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
 866        cx.notify();
 867
 868        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 869            cx.spawn(move |this, mut cx| async move {
 870                if wait_for_reconnect {
 871                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
 872                }
 873
 874                if let Some(this) = this.upgrade() {
 875                    this.update(&mut cx, |this, cx| {
 876                        for (_, buffer) in this.opened_buffers.drain() {
 877                            if let OpenedModelHandle::Open(buffer) = buffer {
 878                                if let Some(buffer) = buffer.upgrade() {
 879                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 880                                }
 881                            }
 882                        }
 883                    })
 884                    .ok();
 885                }
 886            })
 887        });
 888    }
 889
 890    pub(crate) fn update_channels(
 891        &mut self,
 892        payload: proto::UpdateChannels,
 893        cx: &mut ModelContext<ChannelStore>,
 894    ) -> Option<Task<Result<()>>> {
 895        if !payload.remove_channel_invitations.is_empty() {
 896            self.channel_invitations
 897                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 898        }
 899        for channel in payload.channel_invitations {
 900            match self
 901                .channel_invitations
 902                .binary_search_by_key(&channel.id, |c| c.id)
 903            {
 904                Ok(ix) => {
 905                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
 906                }
 907                Err(ix) => self.channel_invitations.insert(
 908                    ix,
 909                    Arc::new(Channel {
 910                        id: channel.id,
 911                        visibility: channel.visibility(),
 912                        role: channel.role(),
 913                        name: channel.name.into(),
 914                        unseen_note_version: None,
 915                        unseen_message_id: None,
 916                        parent_path: channel.parent_path,
 917                    }),
 918                ),
 919            }
 920        }
 921
 922        let channels_changed = !payload.channels.is_empty()
 923            || !payload.delete_channels.is_empty()
 924            || !payload.unseen_channel_messages.is_empty()
 925            || !payload.unseen_channel_buffer_changes.is_empty();
 926
 927        if channels_changed {
 928            if !payload.delete_channels.is_empty() {
 929                self.channel_index.delete_channels(&payload.delete_channels);
 930                self.channel_participants
 931                    .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
 932
 933                for channel_id in &payload.delete_channels {
 934                    let channel_id = *channel_id;
 935                    if payload
 936                        .channels
 937                        .iter()
 938                        .any(|channel| channel.id == channel_id)
 939                    {
 940                        continue;
 941                    }
 942                    if let Some(OpenedModelHandle::Open(buffer)) =
 943                        self.opened_buffers.remove(&channel_id)
 944                    {
 945                        if let Some(buffer) = buffer.upgrade() {
 946                            buffer.update(cx, ChannelBuffer::disconnect);
 947                        }
 948                    }
 949                }
 950            }
 951
 952            let mut index = self.channel_index.bulk_insert();
 953            for channel in payload.channels {
 954                let id = channel.id;
 955                let channel_changed = index.insert(channel);
 956
 957                if channel_changed {
 958                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
 959                        if let Some(buffer) = buffer.upgrade() {
 960                            buffer.update(cx, ChannelBuffer::channel_changed);
 961                        }
 962                    }
 963                }
 964            }
 965
 966            for unseen_buffer_change in payload.unseen_channel_buffer_changes {
 967                let version = language::proto::deserialize_version(&unseen_buffer_change.version);
 968                index.note_changed(
 969                    unseen_buffer_change.channel_id,
 970                    unseen_buffer_change.epoch,
 971                    &version,
 972                );
 973            }
 974
 975            for unseen_channel_message in payload.unseen_channel_messages {
 976                index.new_messages(
 977                    unseen_channel_message.channel_id,
 978                    unseen_channel_message.message_id,
 979                );
 980            }
 981        }
 982
 983        cx.notify();
 984        if payload.channel_participants.is_empty() {
 985            return None;
 986        }
 987
 988        let mut all_user_ids = Vec::new();
 989        let channel_participants = payload.channel_participants;
 990        for entry in &channel_participants {
 991            for user_id in entry.participant_user_ids.iter() {
 992                if let Err(ix) = all_user_ids.binary_search(user_id) {
 993                    all_user_ids.insert(ix, *user_id);
 994                }
 995            }
 996        }
 997
 998        let users = self
 999            .user_store
1000            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1001        Some(cx.spawn(|this, mut cx| async move {
1002            let users = users.await?;
1003
1004            this.update(&mut cx, |this, cx| {
1005                for entry in &channel_participants {
1006                    let mut participants: Vec<_> = entry
1007                        .participant_user_ids
1008                        .iter()
1009                        .filter_map(|user_id| {
1010                            users
1011                                .binary_search_by_key(&user_id, |user| &user.id)
1012                                .ok()
1013                                .map(|ix| users[ix].clone())
1014                        })
1015                        .collect();
1016
1017                    participants.sort_by_key(|u| u.id);
1018
1019                    this.channel_participants
1020                        .insert(entry.channel_id, participants);
1021                }
1022
1023                cx.notify();
1024            })
1025        }))
1026    }
1027}