channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use db::RELEASE_CHANNEL;
   9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  10use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  11use rpc::{
  12    proto::{self, ChannelVisibility},
  13    TypedEnvelope,
  14};
  15use std::{mem, sync::Arc, time::Duration};
  16use util::ResultExt;
  17
  18pub fn init(client: &Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
  19    let channel_store =
  20        cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  21    cx.set_global(channel_store);
  22}
  23
  24pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  25
  26pub type ChannelId = u64;
  27
  28pub struct ChannelStore {
  29    pub channel_index: ChannelIndex,
  30    channel_invitations: Vec<Arc<Channel>>,
  31    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  32    outgoing_invites: HashSet<(ChannelId, UserId)>,
  33    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  34    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  35    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  36    client: Arc<Client>,
  37    user_store: ModelHandle<UserStore>,
  38    _rpc_subscription: Subscription,
  39    _watch_connection_status: Task<Option<()>>,
  40    disconnect_channel_buffers_task: Option<Task<()>>,
  41    _update_channels: Task<()>,
  42}
  43
  44#[derive(Clone, Debug, PartialEq)]
  45pub struct Channel {
  46    pub id: ChannelId,
  47    pub name: String,
  48    pub visibility: proto::ChannelVisibility,
  49    pub role: proto::ChannelRole,
  50    pub unseen_note_version: Option<(u64, clock::Global)>,
  51    pub unseen_message_id: Option<u64>,
  52    pub parent_path: Vec<u64>,
  53}
  54
  55impl Channel {
  56    pub fn link(&self) -> String {
  57        RELEASE_CHANNEL.link_prefix().to_owned()
  58            + "channel/"
  59            + &self.slug()
  60            + "-"
  61            + &self.id.to_string()
  62    }
  63
  64    pub fn slug(&self) -> String {
  65        let slug: String = self
  66            .name
  67            .chars()
  68            .map(|c| if c.is_alphanumeric() { c } else { '-' })
  69            .collect();
  70
  71        slug.trim_matches(|c| c == '-').to_string()
  72    }
  73
  74    pub fn can_edit_notes(&self) -> bool {
  75        self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin
  76    }
  77}
  78
  79pub struct ChannelMembership {
  80    pub user: Arc<User>,
  81    pub kind: proto::channel_member::Kind,
  82    pub role: proto::ChannelRole,
  83}
  84impl ChannelMembership {
  85    pub fn sort_key(&self) -> MembershipSortKey {
  86        MembershipSortKey {
  87            role_order: match self.role {
  88                proto::ChannelRole::Admin => 0,
  89                proto::ChannelRole::Member => 1,
  90                proto::ChannelRole::Banned => 2,
  91                proto::ChannelRole::Guest => 3,
  92            },
  93            kind_order: match self.kind {
  94                proto::channel_member::Kind::Member => 0,
  95                proto::channel_member::Kind::AncestorMember => 1,
  96                proto::channel_member::Kind::Invitee => 2,
  97            },
  98            username_order: self.user.github_login.as_str(),
  99        }
 100    }
 101}
 102
 103#[derive(PartialOrd, Ord, PartialEq, Eq)]
 104pub struct MembershipSortKey<'a> {
 105    role_order: u8,
 106    kind_order: u8,
 107    username_order: &'a str,
 108}
 109
 110pub enum ChannelEvent {
 111    ChannelCreated(ChannelId),
 112    ChannelRenamed(ChannelId),
 113}
 114
 115impl Entity for ChannelStore {
 116    type Event = ChannelEvent;
 117}
 118
 119enum OpenedModelHandle<E: Entity> {
 120    Open(WeakModelHandle<E>),
 121    Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
 122}
 123
 124impl ChannelStore {
 125    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
 126        cx.global::<ModelHandle<Self>>().clone()
 127    }
 128
 129    pub fn new(
 130        client: Arc<Client>,
 131        user_store: ModelHandle<UserStore>,
 132        cx: &mut ModelContext<Self>,
 133    ) -> Self {
 134        let rpc_subscription =
 135            client.add_message_handler(cx.handle(), Self::handle_update_channels);
 136
 137        let mut connection_status = client.status();
 138        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 139        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
 140            while let Some(status) = connection_status.next().await {
 141                let this = this.upgrade(&cx)?;
 142                match status {
 143                    client::Status::Connected { .. } => {
 144                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 145                            .await
 146                            .log_err()?;
 147                    }
 148                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 149                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx));
 150                    }
 151                    _ => {
 152                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx));
 153                    }
 154                }
 155            }
 156            Some(())
 157        });
 158
 159        Self {
 160            channel_invitations: Vec::default(),
 161            channel_index: ChannelIndex::default(),
 162            channel_participants: Default::default(),
 163            outgoing_invites: Default::default(),
 164            opened_buffers: Default::default(),
 165            opened_chats: Default::default(),
 166            update_channels_tx,
 167            client,
 168            user_store,
 169            _rpc_subscription: rpc_subscription,
 170            _watch_connection_status: watch_connection_status,
 171            disconnect_channel_buffers_task: None,
 172            _update_channels: cx.spawn_weak(|this, mut cx| async move {
 173                while let Some(update_channels) = update_channels_rx.next().await {
 174                    if let Some(this) = this.upgrade(&cx) {
 175                        let update_task = this.update(&mut cx, |this, cx| {
 176                            this.update_channels(update_channels, cx)
 177                        });
 178                        if let Some(update_task) = update_task {
 179                            update_task.await.log_err();
 180                        }
 181                    }
 182                }
 183            }),
 184        }
 185    }
 186
 187    pub fn client(&self) -> Arc<Client> {
 188        self.client.clone()
 189    }
 190
 191    /// Returns the number of unique channels in the store
 192    pub fn channel_count(&self) -> usize {
 193        self.channel_index.by_id().len()
 194    }
 195
 196    /// Returns the index of a channel ID in the list of unique channels
 197    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 198        self.channel_index
 199            .by_id()
 200            .keys()
 201            .position(|id| *id == channel_id)
 202    }
 203
 204    /// Returns an iterator over all unique channels
 205    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 206        self.channel_index.by_id().values()
 207    }
 208
 209    /// Iterate over all entries in the channel DAG
 210    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 211        self.channel_index
 212            .ordered_channels()
 213            .iter()
 214            .filter_map(move |id| {
 215                let channel = self.channel_index.by_id().get(id)?;
 216                Some((channel.parent_path.len(), channel))
 217            })
 218    }
 219
 220    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 221        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 222        self.channel_index.by_id().get(channel_id)
 223    }
 224
 225    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 226        self.channel_index.by_id().values().nth(ix)
 227    }
 228
 229    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 230        self.channel_invitations
 231            .iter()
 232            .any(|channel| channel.id == channel_id)
 233    }
 234
 235    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 236        &self.channel_invitations
 237    }
 238
 239    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 240        self.channel_index.by_id().get(&channel_id)
 241    }
 242
 243    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
 244        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 245            if let OpenedModelHandle::Open(buffer) = buffer {
 246                return buffer.upgrade(cx).is_some();
 247            }
 248        }
 249        false
 250    }
 251
 252    pub fn open_channel_buffer(
 253        &mut self,
 254        channel_id: ChannelId,
 255        cx: &mut ModelContext<Self>,
 256    ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
 257        let client = self.client.clone();
 258        let user_store = self.user_store.clone();
 259        let channel_store = cx.handle();
 260        self.open_channel_resource(
 261            channel_id,
 262            |this| &mut this.opened_buffers,
 263            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 264            cx,
 265        )
 266    }
 267
 268    pub fn fetch_channel_messages(
 269        &self,
 270        message_ids: Vec<u64>,
 271        cx: &mut ModelContext<Self>,
 272    ) -> Task<Result<Vec<ChannelMessage>>> {
 273        let request = if message_ids.is_empty() {
 274            None
 275        } else {
 276            Some(
 277                self.client
 278                    .request(proto::GetChannelMessagesById { message_ids }),
 279            )
 280        };
 281        cx.spawn_weak(|this, mut cx| async move {
 282            if let Some(request) = request {
 283                let response = request.await?;
 284                let this = this
 285                    .upgrade(&cx)
 286                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 287                let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
 288                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 289            } else {
 290                Ok(Vec::new())
 291            }
 292        })
 293    }
 294
 295    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
 296        self.channel_index
 297            .by_id()
 298            .get(&channel_id)
 299            .map(|channel| channel.unseen_note_version.is_some())
 300    }
 301
 302    pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
 303        self.channel_index
 304            .by_id()
 305            .get(&channel_id)
 306            .map(|channel| channel.unseen_message_id.is_some())
 307    }
 308
 309    pub fn notes_changed(
 310        &mut self,
 311        channel_id: ChannelId,
 312        epoch: u64,
 313        version: &clock::Global,
 314        cx: &mut ModelContext<Self>,
 315    ) {
 316        self.channel_index.note_changed(channel_id, epoch, version);
 317        cx.notify();
 318    }
 319
 320    pub fn new_message(
 321        &mut self,
 322        channel_id: ChannelId,
 323        message_id: u64,
 324        cx: &mut ModelContext<Self>,
 325    ) {
 326        self.channel_index.new_message(channel_id, message_id);
 327        cx.notify();
 328    }
 329
 330    pub fn acknowledge_message_id(
 331        &mut self,
 332        channel_id: ChannelId,
 333        message_id: u64,
 334        cx: &mut ModelContext<Self>,
 335    ) {
 336        self.channel_index
 337            .acknowledge_message_id(channel_id, message_id);
 338        cx.notify();
 339    }
 340
 341    pub fn acknowledge_notes_version(
 342        &mut self,
 343        channel_id: ChannelId,
 344        epoch: u64,
 345        version: &clock::Global,
 346        cx: &mut ModelContext<Self>,
 347    ) {
 348        self.channel_index
 349            .acknowledge_note_version(channel_id, epoch, version);
 350        cx.notify();
 351    }
 352
 353    pub fn open_channel_chat(
 354        &mut self,
 355        channel_id: ChannelId,
 356        cx: &mut ModelContext<Self>,
 357    ) -> Task<Result<ModelHandle<ChannelChat>>> {
 358        let client = self.client.clone();
 359        let user_store = self.user_store.clone();
 360        let this = cx.handle();
 361        self.open_channel_resource(
 362            channel_id,
 363            |this| &mut this.opened_chats,
 364            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 365            cx,
 366        )
 367    }
 368
 369    /// Asynchronously open a given resource associated with a channel.
 370    ///
 371    /// Make sure that the resource is only opened once, even if this method
 372    /// is called multiple times with the same channel id while the first task
 373    /// is still running.
 374    fn open_channel_resource<T: Entity, F, Fut>(
 375        &mut self,
 376        channel_id: ChannelId,
 377        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 378        load: F,
 379        cx: &mut ModelContext<Self>,
 380    ) -> Task<Result<ModelHandle<T>>>
 381    where
 382        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 383        Fut: Future<Output = Result<ModelHandle<T>>>,
 384    {
 385        let task = loop {
 386            match get_map(self).entry(channel_id) {
 387                hash_map::Entry::Occupied(e) => match e.get() {
 388                    OpenedModelHandle::Open(model) => {
 389                        if let Some(model) = model.upgrade(cx) {
 390                            break Task::ready(Ok(model)).shared();
 391                        } else {
 392                            get_map(self).remove(&channel_id);
 393                            continue;
 394                        }
 395                    }
 396                    OpenedModelHandle::Loading(task) => {
 397                        break task.clone();
 398                    }
 399                },
 400                hash_map::Entry::Vacant(e) => {
 401                    let task = cx
 402                        .spawn(|this, cx| async move {
 403                            let channel = this.read_with(&cx, |this, _| {
 404                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 405                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 406                                })
 407                            })?;
 408
 409                            load(channel, cx).await.map_err(Arc::new)
 410                        })
 411                        .shared();
 412
 413                    e.insert(OpenedModelHandle::Loading(task.clone()));
 414                    cx.spawn({
 415                        let task = task.clone();
 416                        |this, mut cx| async move {
 417                            let result = task.await;
 418                            this.update(&mut cx, |this, _| match result {
 419                                Ok(model) => {
 420                                    get_map(this).insert(
 421                                        channel_id,
 422                                        OpenedModelHandle::Open(model.downgrade()),
 423                                    );
 424                                }
 425                                Err(_) => {
 426                                    get_map(this).remove(&channel_id);
 427                                }
 428                            });
 429                        }
 430                    })
 431                    .detach();
 432                    break task;
 433                }
 434            }
 435        };
 436        cx.foreground()
 437            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 438    }
 439
 440    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 441        let Some(channel) = self.channel_for_id(channel_id) else {
 442            return false;
 443        };
 444        channel.role == proto::ChannelRole::Admin
 445    }
 446
 447    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 448        self.channel_participants
 449            .get(&channel_id)
 450            .map_or(&[], |v| v.as_slice())
 451    }
 452
 453    pub fn create_channel(
 454        &self,
 455        name: &str,
 456        parent_id: Option<ChannelId>,
 457        cx: &mut ModelContext<Self>,
 458    ) -> Task<Result<ChannelId>> {
 459        let client = self.client.clone();
 460        let name = name.trim_start_matches("#").to_owned();
 461        cx.spawn(|this, mut cx| async move {
 462            let response = client
 463                .request(proto::CreateChannel { name, parent_id })
 464                .await?;
 465
 466            let channel = response
 467                .channel
 468                .ok_or_else(|| anyhow!("missing channel in response"))?;
 469            let channel_id = channel.id;
 470
 471            // let parent_edge = if let Some(parent_id) = parent_id {
 472            //     vec![ChannelEdge {
 473            //         channel_id: channel.id,
 474            //         parent_id,
 475            //     }]
 476            // } else {
 477            //     vec![]
 478            // };
 479
 480            this.update(&mut cx, |this, cx| {
 481                let task = this.update_channels(
 482                    proto::UpdateChannels {
 483                        channels: vec![channel],
 484                        ..Default::default()
 485                    },
 486                    cx,
 487                );
 488                assert!(task.is_none());
 489
 490                // This event is emitted because the collab panel wants to clear the pending edit state
 491                // before this frame is rendered. But we can't guarantee that the collab panel's future
 492                // will resolve before this flush_effects finishes. Synchronously emitting this event
 493                // ensures that the collab panel will observe this creation before the frame completes
 494                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 495            });
 496
 497            Ok(channel_id)
 498        })
 499    }
 500
 501    pub fn move_channel(
 502        &mut self,
 503        channel_id: ChannelId,
 504        to: Option<ChannelId>,
 505        cx: &mut ModelContext<Self>,
 506    ) -> Task<Result<()>> {
 507        let client = self.client.clone();
 508        cx.spawn(|_, _| async move {
 509            let _ = client
 510                .request(proto::MoveChannel { channel_id, to })
 511                .await?;
 512
 513            Ok(())
 514        })
 515    }
 516
 517    pub fn set_channel_visibility(
 518        &mut self,
 519        channel_id: ChannelId,
 520        visibility: ChannelVisibility,
 521        cx: &mut ModelContext<Self>,
 522    ) -> Task<Result<()>> {
 523        let client = self.client.clone();
 524        cx.spawn(|_, _| async move {
 525            let _ = client
 526                .request(proto::SetChannelVisibility {
 527                    channel_id,
 528                    visibility: visibility.into(),
 529                })
 530                .await?;
 531
 532            Ok(())
 533        })
 534    }
 535
 536    pub fn invite_member(
 537        &mut self,
 538        channel_id: ChannelId,
 539        user_id: UserId,
 540        role: proto::ChannelRole,
 541        cx: &mut ModelContext<Self>,
 542    ) -> Task<Result<()>> {
 543        if !self.outgoing_invites.insert((channel_id, user_id)) {
 544            return Task::ready(Err(anyhow!("invite request already in progress")));
 545        }
 546
 547        cx.notify();
 548        let client = self.client.clone();
 549        cx.spawn(|this, mut cx| async move {
 550            let result = client
 551                .request(proto::InviteChannelMember {
 552                    channel_id,
 553                    user_id,
 554                    role: role.into(),
 555                })
 556                .await;
 557
 558            this.update(&mut cx, |this, cx| {
 559                this.outgoing_invites.remove(&(channel_id, user_id));
 560                cx.notify();
 561            });
 562
 563            result?;
 564
 565            Ok(())
 566        })
 567    }
 568
 569    pub fn remove_member(
 570        &mut self,
 571        channel_id: ChannelId,
 572        user_id: u64,
 573        cx: &mut ModelContext<Self>,
 574    ) -> Task<Result<()>> {
 575        if !self.outgoing_invites.insert((channel_id, user_id)) {
 576            return Task::ready(Err(anyhow!("invite request already in progress")));
 577        }
 578
 579        cx.notify();
 580        let client = self.client.clone();
 581        cx.spawn(|this, mut cx| async move {
 582            let result = client
 583                .request(proto::RemoveChannelMember {
 584                    channel_id,
 585                    user_id,
 586                })
 587                .await;
 588
 589            this.update(&mut cx, |this, cx| {
 590                this.outgoing_invites.remove(&(channel_id, user_id));
 591                cx.notify();
 592            });
 593            result?;
 594            Ok(())
 595        })
 596    }
 597
 598    pub fn set_member_role(
 599        &mut self,
 600        channel_id: ChannelId,
 601        user_id: UserId,
 602        role: proto::ChannelRole,
 603        cx: &mut ModelContext<Self>,
 604    ) -> Task<Result<()>> {
 605        if !self.outgoing_invites.insert((channel_id, user_id)) {
 606            return Task::ready(Err(anyhow!("member request already in progress")));
 607        }
 608
 609        cx.notify();
 610        let client = self.client.clone();
 611        cx.spawn(|this, mut cx| async move {
 612            let result = client
 613                .request(proto::SetChannelMemberRole {
 614                    channel_id,
 615                    user_id,
 616                    role: role.into(),
 617                })
 618                .await;
 619
 620            this.update(&mut cx, |this, cx| {
 621                this.outgoing_invites.remove(&(channel_id, user_id));
 622                cx.notify();
 623            });
 624
 625            result?;
 626            Ok(())
 627        })
 628    }
 629
 630    pub fn rename(
 631        &mut self,
 632        channel_id: ChannelId,
 633        new_name: &str,
 634        cx: &mut ModelContext<Self>,
 635    ) -> Task<Result<()>> {
 636        let client = self.client.clone();
 637        let name = new_name.to_string();
 638        cx.spawn(|this, mut cx| async move {
 639            let channel = client
 640                .request(proto::RenameChannel { channel_id, name })
 641                .await?
 642                .channel
 643                .ok_or_else(|| anyhow!("missing channel in response"))?;
 644            this.update(&mut cx, |this, cx| {
 645                let task = this.update_channels(
 646                    proto::UpdateChannels {
 647                        channels: vec![channel],
 648                        ..Default::default()
 649                    },
 650                    cx,
 651                );
 652                assert!(task.is_none());
 653
 654                // This event is emitted because the collab panel wants to clear the pending edit state
 655                // before this frame is rendered. But we can't guarantee that the collab panel's future
 656                // will resolve before this flush_effects finishes. Synchronously emitting this event
 657                // ensures that the collab panel will observe this creation before the frame complete
 658                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 659            });
 660            Ok(())
 661        })
 662    }
 663
 664    pub fn respond_to_channel_invite(
 665        &mut self,
 666        channel_id: ChannelId,
 667        accept: bool,
 668        cx: &mut ModelContext<Self>,
 669    ) -> Task<Result<()>> {
 670        let client = self.client.clone();
 671        cx.background().spawn(async move {
 672            client
 673                .request(proto::RespondToChannelInvite { channel_id, accept })
 674                .await?;
 675            Ok(())
 676        })
 677    }
 678
 679    pub fn get_channel_member_details(
 680        &self,
 681        channel_id: ChannelId,
 682        cx: &mut ModelContext<Self>,
 683    ) -> Task<Result<Vec<ChannelMembership>>> {
 684        let client = self.client.clone();
 685        let user_store = self.user_store.downgrade();
 686        cx.spawn(|_, mut cx| async move {
 687            let response = client
 688                .request(proto::GetChannelMembers { channel_id })
 689                .await?;
 690
 691            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 692            let user_store = user_store
 693                .upgrade(&cx)
 694                .ok_or_else(|| anyhow!("user store dropped"))?;
 695            let users = user_store
 696                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
 697                .await?;
 698
 699            Ok(users
 700                .into_iter()
 701                .zip(response.members)
 702                .filter_map(|(user, member)| {
 703                    Some(ChannelMembership {
 704                        user,
 705                        role: member.role(),
 706                        kind: member.kind(),
 707                    })
 708                })
 709                .collect())
 710        })
 711    }
 712
 713    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 714        let client = self.client.clone();
 715        async move {
 716            client.request(proto::DeleteChannel { channel_id }).await?;
 717            Ok(())
 718        }
 719    }
 720
 721    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 722        false
 723    }
 724
 725    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 726        self.outgoing_invites.contains(&(channel_id, user_id))
 727    }
 728
 729    async fn handle_update_channels(
 730        this: ModelHandle<Self>,
 731        message: TypedEnvelope<proto::UpdateChannels>,
 732        _: Arc<Client>,
 733        mut cx: AsyncAppContext,
 734    ) -> Result<()> {
 735        this.update(&mut cx, |this, _| {
 736            this.update_channels_tx
 737                .unbounded_send(message.payload)
 738                .unwrap();
 739        });
 740        Ok(())
 741    }
 742
 743    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 744        self.channel_index.clear();
 745        self.channel_invitations.clear();
 746        self.channel_participants.clear();
 747        self.channel_index.clear();
 748        self.outgoing_invites.clear();
 749        self.disconnect_channel_buffers_task.take();
 750
 751        for chat in self.opened_chats.values() {
 752            if let OpenedModelHandle::Open(chat) = chat {
 753                if let Some(chat) = chat.upgrade(cx) {
 754                    chat.update(cx, |chat, cx| {
 755                        chat.rejoin(cx);
 756                    });
 757                }
 758            }
 759        }
 760
 761        let mut buffer_versions = Vec::new();
 762        for buffer in self.opened_buffers.values() {
 763            if let OpenedModelHandle::Open(buffer) = buffer {
 764                if let Some(buffer) = buffer.upgrade(cx) {
 765                    let channel_buffer = buffer.read(cx);
 766                    let buffer = channel_buffer.buffer().read(cx);
 767                    buffer_versions.push(proto::ChannelBufferVersion {
 768                        channel_id: channel_buffer.channel_id,
 769                        epoch: channel_buffer.epoch(),
 770                        version: language::proto::serialize_version(&buffer.version()),
 771                    });
 772                }
 773            }
 774        }
 775
 776        if buffer_versions.is_empty() {
 777            return Task::ready(Ok(()));
 778        }
 779
 780        let response = self.client.request(proto::RejoinChannelBuffers {
 781            buffers: buffer_versions,
 782        });
 783
 784        cx.spawn(|this, mut cx| async move {
 785            let mut response = response.await?;
 786
 787            this.update(&mut cx, |this, cx| {
 788                this.opened_buffers.retain(|_, buffer| match buffer {
 789                    OpenedModelHandle::Open(channel_buffer) => {
 790                        let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
 791                            return false;
 792                        };
 793
 794                        channel_buffer.update(cx, |channel_buffer, cx| {
 795                            let channel_id = channel_buffer.channel_id;
 796                            if let Some(remote_buffer) = response
 797                                .buffers
 798                                .iter_mut()
 799                                .find(|buffer| buffer.channel_id == channel_id)
 800                            {
 801                                let channel_id = channel_buffer.channel_id;
 802                                let remote_version =
 803                                    language::proto::deserialize_version(&remote_buffer.version);
 804
 805                                channel_buffer.replace_collaborators(
 806                                    mem::take(&mut remote_buffer.collaborators),
 807                                    cx,
 808                                );
 809
 810                                let operations = channel_buffer
 811                                    .buffer()
 812                                    .update(cx, |buffer, cx| {
 813                                        let outgoing_operations =
 814                                            buffer.serialize_ops(Some(remote_version), cx);
 815                                        let incoming_operations =
 816                                            mem::take(&mut remote_buffer.operations)
 817                                                .into_iter()
 818                                                .map(language::proto::deserialize_operation)
 819                                                .collect::<Result<Vec<_>>>()?;
 820                                        buffer.apply_ops(incoming_operations, cx)?;
 821                                        anyhow::Ok(outgoing_operations)
 822                                    })
 823                                    .log_err();
 824
 825                                if let Some(operations) = operations {
 826                                    let client = this.client.clone();
 827                                    cx.background()
 828                                        .spawn(async move {
 829                                            let operations = operations.await;
 830                                            for chunk in
 831                                                language::proto::split_operations(operations)
 832                                            {
 833                                                client
 834                                                    .send(proto::UpdateChannelBuffer {
 835                                                        channel_id,
 836                                                        operations: chunk,
 837                                                    })
 838                                                    .ok();
 839                                            }
 840                                        })
 841                                        .detach();
 842                                    return true;
 843                                }
 844                            }
 845
 846                            channel_buffer.disconnect(cx);
 847                            false
 848                        })
 849                    }
 850                    OpenedModelHandle::Loading(_) => true,
 851                });
 852            });
 853            anyhow::Ok(())
 854        })
 855    }
 856
 857    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
 858        cx.notify();
 859
 860        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 861            cx.spawn_weak(|this, mut cx| async move {
 862                if wait_for_reconnect {
 863                    cx.background().timer(RECONNECT_TIMEOUT).await;
 864                }
 865
 866                if let Some(this) = this.upgrade(&cx) {
 867                    this.update(&mut cx, |this, cx| {
 868                        for (_, buffer) in this.opened_buffers.drain() {
 869                            if let OpenedModelHandle::Open(buffer) = buffer {
 870                                if let Some(buffer) = buffer.upgrade(cx) {
 871                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 872                                }
 873                            }
 874                        }
 875                    });
 876                }
 877            })
 878        });
 879    }
 880
 881    pub(crate) fn update_channels(
 882        &mut self,
 883        payload: proto::UpdateChannels,
 884        cx: &mut ModelContext<ChannelStore>,
 885    ) -> Option<Task<Result<()>>> {
 886        if !payload.remove_channel_invitations.is_empty() {
 887            self.channel_invitations
 888                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 889        }
 890        for channel in payload.channel_invitations {
 891            match self
 892                .channel_invitations
 893                .binary_search_by_key(&channel.id, |c| c.id)
 894            {
 895                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
 896                Err(ix) => self.channel_invitations.insert(
 897                    ix,
 898                    Arc::new(Channel {
 899                        id: channel.id,
 900                        visibility: channel.visibility(),
 901                        role: channel.role(),
 902                        name: channel.name,
 903                        unseen_note_version: None,
 904                        unseen_message_id: None,
 905                        parent_path: channel.parent_path,
 906                    }),
 907                ),
 908            }
 909        }
 910
 911        let channels_changed = !payload.channels.is_empty()
 912            || !payload.delete_channels.is_empty()
 913            || !payload.unseen_channel_messages.is_empty()
 914            || !payload.unseen_channel_buffer_changes.is_empty();
 915
 916        if channels_changed {
 917            if !payload.delete_channels.is_empty() {
 918                self.channel_index.delete_channels(&payload.delete_channels);
 919                self.channel_participants
 920                    .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
 921
 922                for channel_id in &payload.delete_channels {
 923                    let channel_id = *channel_id;
 924                    if payload
 925                        .channels
 926                        .iter()
 927                        .any(|channel| channel.id == channel_id)
 928                    {
 929                        continue;
 930                    }
 931                    if let Some(OpenedModelHandle::Open(buffer)) =
 932                        self.opened_buffers.remove(&channel_id)
 933                    {
 934                        if let Some(buffer) = buffer.upgrade(cx) {
 935                            buffer.update(cx, ChannelBuffer::disconnect);
 936                        }
 937                    }
 938                }
 939            }
 940
 941            let mut index = self.channel_index.bulk_insert();
 942            for channel in payload.channels {
 943                let id = channel.id;
 944                let channel_changed = index.insert(channel);
 945
 946                if channel_changed {
 947                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
 948                        if let Some(buffer) = buffer.upgrade(cx) {
 949                            buffer.update(cx, ChannelBuffer::channel_changed);
 950                        }
 951                    }
 952                }
 953            }
 954
 955            for unseen_buffer_change in payload.unseen_channel_buffer_changes {
 956                let version = language::proto::deserialize_version(&unseen_buffer_change.version);
 957                index.note_changed(
 958                    unseen_buffer_change.channel_id,
 959                    unseen_buffer_change.epoch,
 960                    &version,
 961                );
 962            }
 963
 964            for unseen_channel_message in payload.unseen_channel_messages {
 965                index.new_messages(
 966                    unseen_channel_message.channel_id,
 967                    unseen_channel_message.message_id,
 968                );
 969            }
 970        }
 971
 972        cx.notify();
 973        if payload.channel_participants.is_empty() {
 974            return None;
 975        }
 976
 977        let mut all_user_ids = Vec::new();
 978        let channel_participants = payload.channel_participants;
 979        for entry in &channel_participants {
 980            for user_id in entry.participant_user_ids.iter() {
 981                if let Err(ix) = all_user_ids.binary_search(user_id) {
 982                    all_user_ids.insert(ix, *user_id);
 983                }
 984            }
 985        }
 986
 987        let users = self
 988            .user_store
 989            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
 990        Some(cx.spawn(|this, mut cx| async move {
 991            let users = users.await?;
 992
 993            this.update(&mut cx, |this, cx| {
 994                for entry in &channel_participants {
 995                    let mut participants: Vec<_> = entry
 996                        .participant_user_ids
 997                        .iter()
 998                        .filter_map(|user_id| {
 999                            users
1000                                .binary_search_by_key(&user_id, |user| &user.id)
1001                                .ok()
1002                                .map(|ix| users[ix].clone())
1003                        })
1004                        .collect();
1005
1006                    participants.sort_by_key(|u| u.id);
1007
1008                    this.channel_participants
1009                        .insert(entry.channel_id, participants);
1010                }
1011
1012                cx.notify();
1013            });
1014            anyhow::Ok(())
1015        }))
1016    }
1017}