channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use db::RELEASE_CHANNEL;
   9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  10use gpui::{
  11    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  12};
  13use rpc::{
  14    proto::{self, ChannelVisibility},
  15    TypedEnvelope,
  16};
  17use std::{mem, sync::Arc, time::Duration};
  18use util::{async_maybe, ResultExt};
  19
  20pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  21    let channel_store =
  22        cx.build_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  23    cx.set_global(channel_store);
  24}
  25
  26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  27
  28pub type ChannelId = u64;
  29
  30pub struct ChannelStore {
  31    pub channel_index: ChannelIndex,
  32    channel_invitations: Vec<Arc<Channel>>,
  33    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  34    outgoing_invites: HashSet<(ChannelId, UserId)>,
  35    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  36    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  37    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  38    client: Arc<Client>,
  39    user_store: Model<UserStore>,
  40    _rpc_subscription: Subscription,
  41    _watch_connection_status: Task<Option<()>>,
  42    disconnect_channel_buffers_task: Option<Task<()>>,
  43    _update_channels: Task<()>,
  44}
  45
  46#[derive(Clone, Debug, PartialEq)]
  47pub struct Channel {
  48    pub id: ChannelId,
  49    pub name: String,
  50    pub visibility: proto::ChannelVisibility,
  51    pub role: proto::ChannelRole,
  52    pub unseen_note_version: Option<(u64, clock::Global)>,
  53    pub unseen_message_id: Option<u64>,
  54    pub parent_path: Vec<u64>,
  55}
  56
  57impl Channel {
  58    pub fn link(&self) -> String {
  59        RELEASE_CHANNEL.link_prefix().to_owned()
  60            + "channel/"
  61            + &self.slug()
  62            + "-"
  63            + &self.id.to_string()
  64    }
  65
  66    pub fn slug(&self) -> String {
  67        let slug: String = self
  68            .name
  69            .chars()
  70            .map(|c| if c.is_alphanumeric() { c } else { '-' })
  71            .collect();
  72
  73        slug.trim_matches(|c| c == '-').to_string()
  74    }
  75
  76    pub fn can_edit_notes(&self) -> bool {
  77        self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin
  78    }
  79}
  80
  81pub struct ChannelMembership {
  82    pub user: Arc<User>,
  83    pub kind: proto::channel_member::Kind,
  84    pub role: proto::ChannelRole,
  85}
  86impl ChannelMembership {
  87    pub fn sort_key(&self) -> MembershipSortKey {
  88        MembershipSortKey {
  89            role_order: match self.role {
  90                proto::ChannelRole::Admin => 0,
  91                proto::ChannelRole::Member => 1,
  92                proto::ChannelRole::Banned => 2,
  93                proto::ChannelRole::Guest => 3,
  94            },
  95            kind_order: match self.kind {
  96                proto::channel_member::Kind::Member => 0,
  97                proto::channel_member::Kind::AncestorMember => 1,
  98                proto::channel_member::Kind::Invitee => 2,
  99            },
 100            username_order: self.user.github_login.as_str(),
 101        }
 102    }
 103}
 104
 105#[derive(PartialOrd, Ord, PartialEq, Eq)]
 106pub struct MembershipSortKey<'a> {
 107    role_order: u8,
 108    kind_order: u8,
 109    username_order: &'a str,
 110}
 111
 112pub enum ChannelEvent {
 113    ChannelCreated(ChannelId),
 114    ChannelRenamed(ChannelId),
 115}
 116
 117impl EventEmitter for ChannelStore {
 118    type Event = ChannelEvent;
 119}
 120
 121enum OpenedModelHandle<E> {
 122    Open(WeakModel<E>),
 123    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 124}
 125
 126impl ChannelStore {
 127    pub fn global(cx: &AppContext) -> Model<Self> {
 128        cx.global::<Model<Self>>().clone()
 129    }
 130
 131    pub fn new(
 132        client: Arc<Client>,
 133        user_store: Model<UserStore>,
 134        cx: &mut ModelContext<Self>,
 135    ) -> Self {
 136        let rpc_subscription =
 137            client.add_message_handler(cx.weak_model(), Self::handle_update_channels);
 138
 139        let mut connection_status = client.status();
 140        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 141        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 142            while let Some(status) = connection_status.next().await {
 143                let this = this.upgrade()?;
 144                match status {
 145                    client::Status::Connected { .. } => {
 146                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 147                            .ok()?
 148                            .await
 149                            .log_err()?;
 150                    }
 151                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 152                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 153                            .ok();
 154                    }
 155                    _ => {
 156                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 157                            .ok();
 158                    }
 159                }
 160            }
 161            Some(())
 162        });
 163
 164        Self {
 165            channel_invitations: Vec::default(),
 166            channel_index: ChannelIndex::default(),
 167            channel_participants: Default::default(),
 168            outgoing_invites: Default::default(),
 169            opened_buffers: Default::default(),
 170            opened_chats: Default::default(),
 171            update_channels_tx,
 172            client,
 173            user_store,
 174            _rpc_subscription: rpc_subscription,
 175            _watch_connection_status: watch_connection_status,
 176            disconnect_channel_buffers_task: None,
 177            _update_channels: cx.spawn(|this, mut cx| async move {
 178                async_maybe!({
 179                    while let Some(update_channels) = update_channels_rx.next().await {
 180                        if let Some(this) = this.upgrade() {
 181                            let update_task = this.update(&mut cx, |this, cx| {
 182                                this.update_channels(update_channels, cx)
 183                            })?;
 184                            if let Some(update_task) = update_task {
 185                                update_task.await.log_err();
 186                            }
 187                        }
 188                    }
 189                    anyhow::Ok(())
 190                })
 191                .await
 192                .log_err();
 193            }),
 194        }
 195    }
 196
 197    pub fn client(&self) -> Arc<Client> {
 198        self.client.clone()
 199    }
 200
 201    /// Returns the number of unique channels in the store
 202    pub fn channel_count(&self) -> usize {
 203        self.channel_index.by_id().len()
 204    }
 205
 206    /// Returns the index of a channel ID in the list of unique channels
 207    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 208        self.channel_index
 209            .by_id()
 210            .keys()
 211            .position(|id| *id == channel_id)
 212    }
 213
 214    /// Returns an iterator over all unique channels
 215    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 216        self.channel_index.by_id().values()
 217    }
 218
 219    /// Iterate over all entries in the channel DAG
 220    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 221        self.channel_index
 222            .ordered_channels()
 223            .iter()
 224            .filter_map(move |id| {
 225                let channel = self.channel_index.by_id().get(id)?;
 226                Some((channel.parent_path.len(), channel))
 227            })
 228    }
 229
 230    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 231        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 232        self.channel_index.by_id().get(channel_id)
 233    }
 234
 235    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 236        self.channel_index.by_id().values().nth(ix)
 237    }
 238
 239    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 240        self.channel_invitations
 241            .iter()
 242            .any(|channel| channel.id == channel_id)
 243    }
 244
 245    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 246        &self.channel_invitations
 247    }
 248
 249    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 250        self.channel_index.by_id().get(&channel_id)
 251    }
 252
 253    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 254        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 255            if let OpenedModelHandle::Open(buffer) = buffer {
 256                return buffer.upgrade().is_some();
 257            }
 258        }
 259        false
 260    }
 261
 262    pub fn open_channel_buffer(
 263        &mut self,
 264        channel_id: ChannelId,
 265        cx: &mut ModelContext<Self>,
 266    ) -> Task<Result<Model<ChannelBuffer>>> {
 267        let client = self.client.clone();
 268        let user_store = self.user_store.clone();
 269        let channel_store = cx.handle();
 270        self.open_channel_resource(
 271            channel_id,
 272            |this| &mut this.opened_buffers,
 273            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 274            cx,
 275        )
 276    }
 277
 278    pub fn fetch_channel_messages(
 279        &self,
 280        message_ids: Vec<u64>,
 281        cx: &mut ModelContext<Self>,
 282    ) -> Task<Result<Vec<ChannelMessage>>> {
 283        let request = if message_ids.is_empty() {
 284            None
 285        } else {
 286            Some(
 287                self.client
 288                    .request(proto::GetChannelMessagesById { message_ids }),
 289            )
 290        };
 291        cx.spawn(|this, mut cx| async move {
 292            if let Some(request) = request {
 293                let response = request.await?;
 294                let this = this
 295                    .upgrade()
 296                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 297                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 298                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 299            } else {
 300                Ok(Vec::new())
 301            }
 302        })
 303    }
 304
 305    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
 306        self.channel_index
 307            .by_id()
 308            .get(&channel_id)
 309            .map(|channel| channel.unseen_note_version.is_some())
 310    }
 311
 312    pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
 313        self.channel_index
 314            .by_id()
 315            .get(&channel_id)
 316            .map(|channel| channel.unseen_message_id.is_some())
 317    }
 318
 319    pub fn notes_changed(
 320        &mut self,
 321        channel_id: ChannelId,
 322        epoch: u64,
 323        version: &clock::Global,
 324        cx: &mut ModelContext<Self>,
 325    ) {
 326        self.channel_index.note_changed(channel_id, epoch, version);
 327        cx.notify();
 328    }
 329
 330    pub fn new_message(
 331        &mut self,
 332        channel_id: ChannelId,
 333        message_id: u64,
 334        cx: &mut ModelContext<Self>,
 335    ) {
 336        self.channel_index.new_message(channel_id, message_id);
 337        cx.notify();
 338    }
 339
 340    pub fn acknowledge_message_id(
 341        &mut self,
 342        channel_id: ChannelId,
 343        message_id: u64,
 344        cx: &mut ModelContext<Self>,
 345    ) {
 346        self.channel_index
 347            .acknowledge_message_id(channel_id, message_id);
 348        cx.notify();
 349    }
 350
 351    pub fn acknowledge_notes_version(
 352        &mut self,
 353        channel_id: ChannelId,
 354        epoch: u64,
 355        version: &clock::Global,
 356        cx: &mut ModelContext<Self>,
 357    ) {
 358        self.channel_index
 359            .acknowledge_note_version(channel_id, epoch, version);
 360        cx.notify();
 361    }
 362
 363    pub fn open_channel_chat(
 364        &mut self,
 365        channel_id: ChannelId,
 366        cx: &mut ModelContext<Self>,
 367    ) -> Task<Result<Model<ChannelChat>>> {
 368        let client = self.client.clone();
 369        let user_store = self.user_store.clone();
 370        let this = cx.handle();
 371        self.open_channel_resource(
 372            channel_id,
 373            |this| &mut this.opened_chats,
 374            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 375            cx,
 376        )
 377    }
 378
 379    /// Asynchronously open a given resource associated with a channel.
 380    ///
 381    /// Make sure that the resource is only opened once, even if this method
 382    /// is called multiple times with the same channel id while the first task
 383    /// is still running.
 384    fn open_channel_resource<T, F, Fut>(
 385        &mut self,
 386        channel_id: ChannelId,
 387        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 388        load: F,
 389        cx: &mut ModelContext<Self>,
 390    ) -> Task<Result<Model<T>>>
 391    where
 392        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 393        Fut: Future<Output = Result<Model<T>>>,
 394        T: 'static,
 395    {
 396        let task = loop {
 397            match get_map(self).entry(channel_id) {
 398                hash_map::Entry::Occupied(e) => match e.get() {
 399                    OpenedModelHandle::Open(model) => {
 400                        if let Some(model) = model.upgrade() {
 401                            break Task::ready(Ok(model)).shared();
 402                        } else {
 403                            get_map(self).remove(&channel_id);
 404                            continue;
 405                        }
 406                    }
 407                    OpenedModelHandle::Loading(task) => {
 408                        break task.clone();
 409                    }
 410                },
 411                hash_map::Entry::Vacant(e) => {
 412                    let task = cx
 413                        .spawn(move |this, mut cx| async move {
 414                            let channel = this.update(&mut cx, |this, _| {
 415                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 416                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 417                                })
 418                            })??;
 419
 420                            load(channel, cx).await.map_err(Arc::new)
 421                        })
 422                        .shared();
 423
 424                    e.insert(OpenedModelHandle::Loading(task.clone()));
 425                    cx.spawn({
 426                        let task = task.clone();
 427                        move |this, mut cx| async move {
 428                            let result = task.await;
 429                            this.update(&mut cx, |this, _| match result {
 430                                Ok(model) => {
 431                                    get_map(this).insert(
 432                                        channel_id,
 433                                        OpenedModelHandle::Open(model.downgrade()),
 434                                    );
 435                                }
 436                                Err(_) => {
 437                                    get_map(this).remove(&channel_id);
 438                                }
 439                            })
 440                            .ok();
 441                        }
 442                    })
 443                    .detach();
 444                    break task;
 445                }
 446            }
 447        };
 448        cx.background_executor()
 449            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 450    }
 451
 452    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 453        let Some(channel) = self.channel_for_id(channel_id) else {
 454            return false;
 455        };
 456        channel.role == proto::ChannelRole::Admin
 457    }
 458
 459    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 460        self.channel_participants
 461            .get(&channel_id)
 462            .map_or(&[], |v| v.as_slice())
 463    }
 464
 465    pub fn create_channel(
 466        &self,
 467        name: &str,
 468        parent_id: Option<ChannelId>,
 469        cx: &mut ModelContext<Self>,
 470    ) -> Task<Result<ChannelId>> {
 471        let client = self.client.clone();
 472        let name = name.trim_start_matches("#").to_owned();
 473        cx.spawn(move |this, mut cx| async move {
 474            let response = client
 475                .request(proto::CreateChannel { name, parent_id })
 476                .await?;
 477
 478            let channel = response
 479                .channel
 480                .ok_or_else(|| anyhow!("missing channel in response"))?;
 481            let channel_id = channel.id;
 482
 483            this.update(&mut cx, |this, cx| {
 484                let task = this.update_channels(
 485                    proto::UpdateChannels {
 486                        channels: vec![channel],
 487                        ..Default::default()
 488                    },
 489                    cx,
 490                );
 491                assert!(task.is_none());
 492
 493                // This event is emitted because the collab panel wants to clear the pending edit state
 494                // before this frame is rendered. But we can't guarantee that the collab panel's future
 495                // will resolve before this flush_effects finishes. Synchronously emitting this event
 496                // ensures that the collab panel will observe this creation before the frame completes
 497                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 498            })?;
 499
 500            Ok(channel_id)
 501        })
 502    }
 503
 504    pub fn move_channel(
 505        &mut self,
 506        channel_id: ChannelId,
 507        to: Option<ChannelId>,
 508        cx: &mut ModelContext<Self>,
 509    ) -> Task<Result<()>> {
 510        let client = self.client.clone();
 511        cx.spawn(move |_, _| async move {
 512            let _ = client
 513                .request(proto::MoveChannel { channel_id, to })
 514                .await?;
 515
 516            Ok(())
 517        })
 518    }
 519
 520    pub fn set_channel_visibility(
 521        &mut self,
 522        channel_id: ChannelId,
 523        visibility: ChannelVisibility,
 524        cx: &mut ModelContext<Self>,
 525    ) -> Task<Result<()>> {
 526        let client = self.client.clone();
 527        cx.spawn(move |_, _| async move {
 528            let _ = client
 529                .request(proto::SetChannelVisibility {
 530                    channel_id,
 531                    visibility: visibility.into(),
 532                })
 533                .await?;
 534
 535            Ok(())
 536        })
 537    }
 538
 539    pub fn invite_member(
 540        &mut self,
 541        channel_id: ChannelId,
 542        user_id: UserId,
 543        role: proto::ChannelRole,
 544        cx: &mut ModelContext<Self>,
 545    ) -> Task<Result<()>> {
 546        if !self.outgoing_invites.insert((channel_id, user_id)) {
 547            return Task::ready(Err(anyhow!("invite request already in progress")));
 548        }
 549
 550        cx.notify();
 551        let client = self.client.clone();
 552        cx.spawn(move |this, mut cx| async move {
 553            let result = client
 554                .request(proto::InviteChannelMember {
 555                    channel_id,
 556                    user_id,
 557                    role: role.into(),
 558                })
 559                .await;
 560
 561            this.update(&mut cx, |this, cx| {
 562                this.outgoing_invites.remove(&(channel_id, user_id));
 563                cx.notify();
 564            })?;
 565
 566            result?;
 567
 568            Ok(())
 569        })
 570    }
 571
 572    pub fn remove_member(
 573        &mut self,
 574        channel_id: ChannelId,
 575        user_id: u64,
 576        cx: &mut ModelContext<Self>,
 577    ) -> Task<Result<()>> {
 578        if !self.outgoing_invites.insert((channel_id, user_id)) {
 579            return Task::ready(Err(anyhow!("invite request already in progress")));
 580        }
 581
 582        cx.notify();
 583        let client = self.client.clone();
 584        cx.spawn(move |this, mut cx| async move {
 585            let result = client
 586                .request(proto::RemoveChannelMember {
 587                    channel_id,
 588                    user_id,
 589                })
 590                .await;
 591
 592            this.update(&mut cx, |this, cx| {
 593                this.outgoing_invites.remove(&(channel_id, user_id));
 594                cx.notify();
 595            })?;
 596            result?;
 597            Ok(())
 598        })
 599    }
 600
 601    pub fn set_member_role(
 602        &mut self,
 603        channel_id: ChannelId,
 604        user_id: UserId,
 605        role: proto::ChannelRole,
 606        cx: &mut ModelContext<Self>,
 607    ) -> Task<Result<()>> {
 608        if !self.outgoing_invites.insert((channel_id, user_id)) {
 609            return Task::ready(Err(anyhow!("member request already in progress")));
 610        }
 611
 612        cx.notify();
 613        let client = self.client.clone();
 614        cx.spawn(move |this, mut cx| async move {
 615            let result = client
 616                .request(proto::SetChannelMemberRole {
 617                    channel_id,
 618                    user_id,
 619                    role: role.into(),
 620                })
 621                .await;
 622
 623            this.update(&mut cx, |this, cx| {
 624                this.outgoing_invites.remove(&(channel_id, user_id));
 625                cx.notify();
 626            })?;
 627
 628            result?;
 629            Ok(())
 630        })
 631    }
 632
 633    pub fn rename(
 634        &mut self,
 635        channel_id: ChannelId,
 636        new_name: &str,
 637        cx: &mut ModelContext<Self>,
 638    ) -> Task<Result<()>> {
 639        let client = self.client.clone();
 640        let name = new_name.to_string();
 641        cx.spawn(move |this, mut cx| async move {
 642            let channel = client
 643                .request(proto::RenameChannel { channel_id, name })
 644                .await?
 645                .channel
 646                .ok_or_else(|| anyhow!("missing channel in response"))?;
 647            this.update(&mut cx, |this, cx| {
 648                let task = this.update_channels(
 649                    proto::UpdateChannels {
 650                        channels: vec![channel],
 651                        ..Default::default()
 652                    },
 653                    cx,
 654                );
 655                assert!(task.is_none());
 656
 657                // This event is emitted because the collab panel wants to clear the pending edit state
 658                // before this frame is rendered. But we can't guarantee that the collab panel's future
 659                // will resolve before this flush_effects finishes. Synchronously emitting this event
 660                // ensures that the collab panel will observe this creation before the frame complete
 661                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 662            })?;
 663            Ok(())
 664        })
 665    }
 666
 667    pub fn respond_to_channel_invite(
 668        &mut self,
 669        channel_id: ChannelId,
 670        accept: bool,
 671        cx: &mut ModelContext<Self>,
 672    ) -> Task<Result<()>> {
 673        let client = self.client.clone();
 674        cx.background_executor().spawn(async move {
 675            client
 676                .request(proto::RespondToChannelInvite { channel_id, accept })
 677                .await?;
 678            Ok(())
 679        })
 680    }
 681
 682    pub fn get_channel_member_details(
 683        &self,
 684        channel_id: ChannelId,
 685        cx: &mut ModelContext<Self>,
 686    ) -> Task<Result<Vec<ChannelMembership>>> {
 687        let client = self.client.clone();
 688        let user_store = self.user_store.downgrade();
 689        cx.spawn(move |_, mut cx| async move {
 690            let response = client
 691                .request(proto::GetChannelMembers { channel_id })
 692                .await?;
 693
 694            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 695            let user_store = user_store
 696                .upgrade()
 697                .ok_or_else(|| anyhow!("user store dropped"))?;
 698            let users = user_store
 699                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 700                .await?;
 701
 702            Ok(users
 703                .into_iter()
 704                .zip(response.members)
 705                .filter_map(|(user, member)| {
 706                    Some(ChannelMembership {
 707                        user,
 708                        role: member.role(),
 709                        kind: member.kind(),
 710                    })
 711                })
 712                .collect())
 713        })
 714    }
 715
 716    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 717        let client = self.client.clone();
 718        async move {
 719            client.request(proto::DeleteChannel { channel_id }).await?;
 720            Ok(())
 721        }
 722    }
 723
 724    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 725        false
 726    }
 727
 728    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 729        self.outgoing_invites.contains(&(channel_id, user_id))
 730    }
 731
 732    async fn handle_update_channels(
 733        this: Model<Self>,
 734        message: TypedEnvelope<proto::UpdateChannels>,
 735        _: Arc<Client>,
 736        mut cx: AsyncAppContext,
 737    ) -> Result<()> {
 738        this.update(&mut cx, |this, _| {
 739            this.update_channels_tx
 740                .unbounded_send(message.payload)
 741                .unwrap();
 742        })?;
 743        Ok(())
 744    }
 745
 746    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 747        self.channel_index.clear();
 748        self.channel_invitations.clear();
 749        self.channel_participants.clear();
 750        self.channel_index.clear();
 751        self.outgoing_invites.clear();
 752        self.disconnect_channel_buffers_task.take();
 753
 754        for chat in self.opened_chats.values() {
 755            if let OpenedModelHandle::Open(chat) = chat {
 756                if let Some(chat) = chat.upgrade() {
 757                    chat.update(cx, |chat, cx| {
 758                        chat.rejoin(cx);
 759                    });
 760                }
 761            }
 762        }
 763
 764        let mut buffer_versions = Vec::new();
 765        for buffer in self.opened_buffers.values() {
 766            if let OpenedModelHandle::Open(buffer) = buffer {
 767                if let Some(buffer) = buffer.upgrade() {
 768                    let channel_buffer = buffer.read(cx);
 769                    let buffer = channel_buffer.buffer().read(cx);
 770                    buffer_versions.push(proto::ChannelBufferVersion {
 771                        channel_id: channel_buffer.channel_id,
 772                        epoch: channel_buffer.epoch(),
 773                        version: language::proto::serialize_version(&buffer.version()),
 774                    });
 775                }
 776            }
 777        }
 778
 779        if buffer_versions.is_empty() {
 780            return Task::ready(Ok(()));
 781        }
 782
 783        let response = self.client.request(proto::RejoinChannelBuffers {
 784            buffers: buffer_versions,
 785        });
 786
 787        cx.spawn(|this, mut cx| async move {
 788            let mut response = response.await?;
 789
 790            this.update(&mut cx, |this, cx| {
 791                this.opened_buffers.retain(|_, buffer| match buffer {
 792                    OpenedModelHandle::Open(channel_buffer) => {
 793                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 794                            return false;
 795                        };
 796
 797                        channel_buffer.update(cx, |channel_buffer, cx| {
 798                            let channel_id = channel_buffer.channel_id;
 799                            if let Some(remote_buffer) = response
 800                                .buffers
 801                                .iter_mut()
 802                                .find(|buffer| buffer.channel_id == channel_id)
 803                            {
 804                                let channel_id = channel_buffer.channel_id;
 805                                let remote_version =
 806                                    language::proto::deserialize_version(&remote_buffer.version);
 807
 808                                channel_buffer.replace_collaborators(
 809                                    mem::take(&mut remote_buffer.collaborators),
 810                                    cx,
 811                                );
 812
 813                                let operations = channel_buffer
 814                                    .buffer()
 815                                    .update(cx, |buffer, cx| {
 816                                        let outgoing_operations =
 817                                            buffer.serialize_ops(Some(remote_version), cx);
 818                                        let incoming_operations =
 819                                            mem::take(&mut remote_buffer.operations)
 820                                                .into_iter()
 821                                                .map(language::proto::deserialize_operation)
 822                                                .collect::<Result<Vec<_>>>()?;
 823                                        buffer.apply_ops(incoming_operations, cx)?;
 824                                        anyhow::Ok(outgoing_operations)
 825                                    })
 826                                    .log_err();
 827
 828                                if let Some(operations) = operations {
 829                                    let client = this.client.clone();
 830                                    cx.background_executor()
 831                                        .spawn(async move {
 832                                            let operations = operations.await;
 833                                            for chunk in
 834                                                language::proto::split_operations(operations)
 835                                            {
 836                                                client
 837                                                    .send(proto::UpdateChannelBuffer {
 838                                                        channel_id,
 839                                                        operations: chunk,
 840                                                    })
 841                                                    .ok();
 842                                            }
 843                                        })
 844                                        .detach();
 845                                    return true;
 846                                }
 847                            }
 848
 849                            channel_buffer.disconnect(cx);
 850                            false
 851                        })
 852                    }
 853                    OpenedModelHandle::Loading(_) => true,
 854                });
 855            })
 856            .ok();
 857            anyhow::Ok(())
 858        })
 859    }
 860
 861    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
 862        cx.notify();
 863
 864        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 865            cx.spawn(move |this, mut cx| async move {
 866                if wait_for_reconnect {
 867                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
 868                }
 869
 870                if let Some(this) = this.upgrade() {
 871                    this.update(&mut cx, |this, cx| {
 872                        for (_, buffer) in this.opened_buffers.drain() {
 873                            if let OpenedModelHandle::Open(buffer) = buffer {
 874                                if let Some(buffer) = buffer.upgrade() {
 875                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 876                                }
 877                            }
 878                        }
 879                    })
 880                    .ok();
 881                }
 882            })
 883        });
 884    }
 885
 886    pub(crate) fn update_channels(
 887        &mut self,
 888        payload: proto::UpdateChannels,
 889        cx: &mut ModelContext<ChannelStore>,
 890    ) -> Option<Task<Result<()>>> {
 891        if !payload.remove_channel_invitations.is_empty() {
 892            self.channel_invitations
 893                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 894        }
 895        for channel in payload.channel_invitations {
 896            match self
 897                .channel_invitations
 898                .binary_search_by_key(&channel.id, |c| c.id)
 899            {
 900                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
 901                Err(ix) => self.channel_invitations.insert(
 902                    ix,
 903                    Arc::new(Channel {
 904                        id: channel.id,
 905                        visibility: channel.visibility(),
 906                        role: channel.role(),
 907                        name: channel.name,
 908                        unseen_note_version: None,
 909                        unseen_message_id: None,
 910                        parent_path: channel.parent_path,
 911                    }),
 912                ),
 913            }
 914        }
 915
 916        let channels_changed = !payload.channels.is_empty()
 917            || !payload.delete_channels.is_empty()
 918            || !payload.unseen_channel_messages.is_empty()
 919            || !payload.unseen_channel_buffer_changes.is_empty();
 920
 921        if channels_changed {
 922            if !payload.delete_channels.is_empty() {
 923                self.channel_index.delete_channels(&payload.delete_channels);
 924                self.channel_participants
 925                    .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
 926
 927                for channel_id in &payload.delete_channels {
 928                    let channel_id = *channel_id;
 929                    if payload
 930                        .channels
 931                        .iter()
 932                        .any(|channel| channel.id == channel_id)
 933                    {
 934                        continue;
 935                    }
 936                    if let Some(OpenedModelHandle::Open(buffer)) =
 937                        self.opened_buffers.remove(&channel_id)
 938                    {
 939                        if let Some(buffer) = buffer.upgrade() {
 940                            buffer.update(cx, ChannelBuffer::disconnect);
 941                        }
 942                    }
 943                }
 944            }
 945
 946            let mut index = self.channel_index.bulk_insert();
 947            for channel in payload.channels {
 948                let id = channel.id;
 949                let channel_changed = index.insert(channel);
 950
 951                if channel_changed {
 952                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
 953                        if let Some(buffer) = buffer.upgrade() {
 954                            buffer.update(cx, ChannelBuffer::channel_changed);
 955                        }
 956                    }
 957                }
 958            }
 959
 960            for unseen_buffer_change in payload.unseen_channel_buffer_changes {
 961                let version = language::proto::deserialize_version(&unseen_buffer_change.version);
 962                index.note_changed(
 963                    unseen_buffer_change.channel_id,
 964                    unseen_buffer_change.epoch,
 965                    &version,
 966                );
 967            }
 968
 969            for unseen_channel_message in payload.unseen_channel_messages {
 970                index.new_messages(
 971                    unseen_channel_message.channel_id,
 972                    unseen_channel_message.message_id,
 973                );
 974            }
 975        }
 976
 977        cx.notify();
 978        if payload.channel_participants.is_empty() {
 979            return None;
 980        }
 981
 982        let mut all_user_ids = Vec::new();
 983        let channel_participants = payload.channel_participants;
 984        for entry in &channel_participants {
 985            for user_id in entry.participant_user_ids.iter() {
 986                if let Err(ix) = all_user_ids.binary_search(user_id) {
 987                    all_user_ids.insert(ix, *user_id);
 988                }
 989            }
 990        }
 991
 992        let users = self
 993            .user_store
 994            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
 995        Some(cx.spawn(|this, mut cx| async move {
 996            let users = users.await?;
 997
 998            this.update(&mut cx, |this, cx| {
 999                for entry in &channel_participants {
1000                    let mut participants: Vec<_> = entry
1001                        .participant_user_ids
1002                        .iter()
1003                        .filter_map(|user_id| {
1004                            users
1005                                .binary_search_by_key(&user_id, |user| &user.id)
1006                                .ok()
1007                                .map(|ix| users[ix].clone())
1008                        })
1009                        .collect();
1010
1011                    participants.sort_by_key(|u| u.id);
1012
1013                    this.channel_participants
1014                        .insert(entry.channel_id, participants);
1015                }
1016
1017                cx.notify();
1018            })
1019        }))
1020    }
1021}