channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use db::RELEASE_CHANNEL;
   9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  10use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  11use rpc::{
  12    proto::{self, ChannelEdge, ChannelPermission, ChannelRole},
  13    TypedEnvelope,
  14};
  15use serde_derive::{Deserialize, Serialize};
  16use std::{borrow::Cow, hash::Hash, mem, ops::Deref, sync::Arc, time::Duration};
  17use util::ResultExt;
  18
  19pub fn init(client: &Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
  20    let channel_store =
  21        cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  22    cx.set_global(channel_store);
  23}
  24
  25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  26
  27pub type ChannelId = u64;
  28
  29pub struct ChannelStore {
  30    channel_index: ChannelIndex,
  31    channel_invitations: Vec<Arc<Channel>>,
  32    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  33    channels_with_admin_privileges: HashSet<ChannelId>,
  34    outgoing_invites: HashSet<(ChannelId, UserId)>,
  35    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  36    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  37    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  38    client: Arc<Client>,
  39    user_store: ModelHandle<UserStore>,
  40    _rpc_subscription: Subscription,
  41    _watch_connection_status: Task<Option<()>>,
  42    disconnect_channel_buffers_task: Option<Task<()>>,
  43    _update_channels: Task<()>,
  44}
  45
  46pub type ChannelData = (Channel, ChannelPath);
  47
  48#[derive(Clone, Debug, PartialEq)]
  49pub struct Channel {
  50    pub id: ChannelId,
  51    pub name: String,
  52    pub unseen_note_version: Option<(u64, clock::Global)>,
  53    pub unseen_message_id: Option<u64>,
  54}
  55
  56impl Channel {
  57    pub fn link(&self) -> String {
  58        RELEASE_CHANNEL.link_prefix().to_owned()
  59            + "channel/"
  60            + &self.slug()
  61            + "-"
  62            + &self.id.to_string()
  63    }
  64
  65    pub fn slug(&self) -> String {
  66        let slug: String = self
  67            .name
  68            .chars()
  69            .map(|c| if c.is_alphanumeric() { c } else { '-' })
  70            .collect();
  71
  72        slug.trim_matches(|c| c == '-').to_string()
  73    }
  74}
  75
  76#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  77pub struct ChannelPath(Arc<[ChannelId]>);
  78
  79pub struct ChannelMembership {
  80    pub user: Arc<User>,
  81    pub kind: proto::channel_member::Kind,
  82    pub role: proto::ChannelRole,
  83}
  84
  85pub enum ChannelEvent {
  86    ChannelCreated(ChannelId),
  87    ChannelRenamed(ChannelId),
  88}
  89
  90impl Entity for ChannelStore {
  91    type Event = ChannelEvent;
  92}
  93
  94enum OpenedModelHandle<E: Entity> {
  95    Open(WeakModelHandle<E>),
  96    Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
  97}
  98
  99impl ChannelStore {
 100    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
 101        cx.global::<ModelHandle<Self>>().clone()
 102    }
 103
 104    pub fn new(
 105        client: Arc<Client>,
 106        user_store: ModelHandle<UserStore>,
 107        cx: &mut ModelContext<Self>,
 108    ) -> Self {
 109        let rpc_subscription =
 110            client.add_message_handler(cx.handle(), Self::handle_update_channels);
 111
 112        let mut connection_status = client.status();
 113        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 114        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
 115            while let Some(status) = connection_status.next().await {
 116                let this = this.upgrade(&cx)?;
 117                if status.is_connected() {
 118                    this.update(&mut cx, |this, cx| this.handle_connect(cx))
 119                        .await
 120                        .log_err()?;
 121                } else {
 122                    this.update(&mut cx, |this, cx| this.handle_disconnect(cx));
 123                }
 124            }
 125            Some(())
 126        });
 127
 128        Self {
 129            channel_invitations: Vec::default(),
 130            channel_index: ChannelIndex::default(),
 131            channel_participants: Default::default(),
 132            channels_with_admin_privileges: Default::default(),
 133            outgoing_invites: Default::default(),
 134            opened_buffers: Default::default(),
 135            opened_chats: Default::default(),
 136            update_channels_tx,
 137            client,
 138            user_store,
 139            _rpc_subscription: rpc_subscription,
 140            _watch_connection_status: watch_connection_status,
 141            disconnect_channel_buffers_task: None,
 142            _update_channels: cx.spawn_weak(|this, mut cx| async move {
 143                while let Some(update_channels) = update_channels_rx.next().await {
 144                    if let Some(this) = this.upgrade(&cx) {
 145                        let update_task = this.update(&mut cx, |this, cx| {
 146                            this.update_channels(update_channels, cx)
 147                        });
 148                        if let Some(update_task) = update_task {
 149                            update_task.await.log_err();
 150                        }
 151                    }
 152                }
 153            }),
 154        }
 155    }
 156
 157    pub fn client(&self) -> Arc<Client> {
 158        self.client.clone()
 159    }
 160
 161    pub fn has_children(&self, channel_id: ChannelId) -> bool {
 162        self.channel_index.iter().any(|path| {
 163            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
 164                path.len() > ix + 1
 165            } else {
 166                false
 167            }
 168        })
 169    }
 170
 171    /// Returns the number of unique channels in the store
 172    pub fn channel_count(&self) -> usize {
 173        self.channel_index.by_id().len()
 174    }
 175
 176    /// Returns the index of a channel ID in the list of unique channels
 177    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 178        self.channel_index
 179            .by_id()
 180            .keys()
 181            .position(|id| *id == channel_id)
 182    }
 183
 184    /// Returns an iterator over all unique channels
 185    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 186        self.channel_index.by_id().values()
 187    }
 188
 189    /// Iterate over all entries in the channel DAG
 190    pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 191        self.channel_index.iter().map(move |path| {
 192            let id = path.last().unwrap();
 193            let channel = self.channel_for_id(*id).unwrap();
 194            (path.len() - 1, channel)
 195        })
 196    }
 197
 198    pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
 199        let path = self.channel_index.get(ix)?;
 200        let id = path.last().unwrap();
 201        let channel = self.channel_for_id(*id).unwrap();
 202
 203        Some((channel, path))
 204    }
 205
 206    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 207        self.channel_index.by_id().values().nth(ix)
 208    }
 209
 210    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 211        &self.channel_invitations
 212    }
 213
 214    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 215        self.channel_index.by_id().get(&channel_id)
 216    }
 217
 218    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
 219        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 220            if let OpenedModelHandle::Open(buffer) = buffer {
 221                return buffer.upgrade(cx).is_some();
 222            }
 223        }
 224        false
 225    }
 226
 227    pub fn open_channel_buffer(
 228        &mut self,
 229        channel_id: ChannelId,
 230        cx: &mut ModelContext<Self>,
 231    ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
 232        let client = self.client.clone();
 233        let user_store = self.user_store.clone();
 234        self.open_channel_resource(
 235            channel_id,
 236            |this| &mut this.opened_buffers,
 237            |channel, cx| ChannelBuffer::new(channel, client, user_store, cx),
 238            cx,
 239        )
 240    }
 241
 242    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
 243        self.channel_index
 244            .by_id()
 245            .get(&channel_id)
 246            .map(|channel| channel.unseen_note_version.is_some())
 247    }
 248
 249    pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
 250        self.channel_index
 251            .by_id()
 252            .get(&channel_id)
 253            .map(|channel| channel.unseen_message_id.is_some())
 254    }
 255
 256    pub fn notes_changed(
 257        &mut self,
 258        channel_id: ChannelId,
 259        epoch: u64,
 260        version: &clock::Global,
 261        cx: &mut ModelContext<Self>,
 262    ) {
 263        self.channel_index.note_changed(channel_id, epoch, version);
 264        cx.notify();
 265    }
 266
 267    pub fn new_message(
 268        &mut self,
 269        channel_id: ChannelId,
 270        message_id: u64,
 271        cx: &mut ModelContext<Self>,
 272    ) {
 273        self.channel_index.new_message(channel_id, message_id);
 274        cx.notify();
 275    }
 276
 277    pub fn acknowledge_message_id(
 278        &mut self,
 279        channel_id: ChannelId,
 280        message_id: u64,
 281        cx: &mut ModelContext<Self>,
 282    ) {
 283        self.channel_index
 284            .acknowledge_message_id(channel_id, message_id);
 285        cx.notify();
 286    }
 287
 288    pub fn acknowledge_notes_version(
 289        &mut self,
 290        channel_id: ChannelId,
 291        epoch: u64,
 292        version: &clock::Global,
 293        cx: &mut ModelContext<Self>,
 294    ) {
 295        self.channel_index
 296            .acknowledge_note_version(channel_id, epoch, version);
 297        cx.notify();
 298    }
 299
 300    pub fn open_channel_chat(
 301        &mut self,
 302        channel_id: ChannelId,
 303        cx: &mut ModelContext<Self>,
 304    ) -> Task<Result<ModelHandle<ChannelChat>>> {
 305        let client = self.client.clone();
 306        let user_store = self.user_store.clone();
 307        let this = cx.handle();
 308        self.open_channel_resource(
 309            channel_id,
 310            |this| &mut this.opened_chats,
 311            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 312            cx,
 313        )
 314    }
 315
 316    /// Asynchronously open a given resource associated with a channel.
 317    ///
 318    /// Make sure that the resource is only opened once, even if this method
 319    /// is called multiple times with the same channel id while the first task
 320    /// is still running.
 321    fn open_channel_resource<T: Entity, F, Fut>(
 322        &mut self,
 323        channel_id: ChannelId,
 324        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 325        load: F,
 326        cx: &mut ModelContext<Self>,
 327    ) -> Task<Result<ModelHandle<T>>>
 328    where
 329        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 330        Fut: Future<Output = Result<ModelHandle<T>>>,
 331    {
 332        let task = loop {
 333            match get_map(self).entry(channel_id) {
 334                hash_map::Entry::Occupied(e) => match e.get() {
 335                    OpenedModelHandle::Open(model) => {
 336                        if let Some(model) = model.upgrade(cx) {
 337                            break Task::ready(Ok(model)).shared();
 338                        } else {
 339                            get_map(self).remove(&channel_id);
 340                            continue;
 341                        }
 342                    }
 343                    OpenedModelHandle::Loading(task) => {
 344                        break task.clone();
 345                    }
 346                },
 347                hash_map::Entry::Vacant(e) => {
 348                    let task = cx
 349                        .spawn(|this, cx| async move {
 350                            let channel = this.read_with(&cx, |this, _| {
 351                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 352                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 353                                })
 354                            })?;
 355
 356                            load(channel, cx).await.map_err(Arc::new)
 357                        })
 358                        .shared();
 359
 360                    e.insert(OpenedModelHandle::Loading(task.clone()));
 361                    cx.spawn({
 362                        let task = task.clone();
 363                        |this, mut cx| async move {
 364                            let result = task.await;
 365                            this.update(&mut cx, |this, _| match result {
 366                                Ok(model) => {
 367                                    get_map(this).insert(
 368                                        channel_id,
 369                                        OpenedModelHandle::Open(model.downgrade()),
 370                                    );
 371                                }
 372                                Err(_) => {
 373                                    get_map(this).remove(&channel_id);
 374                                }
 375                            });
 376                        }
 377                    })
 378                    .detach();
 379                    break task;
 380                }
 381            }
 382        };
 383        cx.foreground()
 384            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 385    }
 386
 387    pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
 388        self.channel_index.iter().any(|path| {
 389            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
 390                path[..=ix]
 391                    .iter()
 392                    .any(|id| self.channels_with_admin_privileges.contains(id))
 393            } else {
 394                false
 395            }
 396        })
 397    }
 398
 399    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 400        self.channel_participants
 401            .get(&channel_id)
 402            .map_or(&[], |v| v.as_slice())
 403    }
 404
 405    pub fn create_channel(
 406        &self,
 407        name: &str,
 408        parent_id: Option<ChannelId>,
 409        cx: &mut ModelContext<Self>,
 410    ) -> Task<Result<ChannelId>> {
 411        let client = self.client.clone();
 412        let name = name.trim_start_matches("#").to_owned();
 413        cx.spawn(|this, mut cx| async move {
 414            let response = client
 415                .request(proto::CreateChannel { name, parent_id })
 416                .await?;
 417
 418            let channel = response
 419                .channel
 420                .ok_or_else(|| anyhow!("missing channel in response"))?;
 421            let channel_id = channel.id;
 422
 423            let parent_edge = if let Some(parent_id) = parent_id {
 424                vec![ChannelEdge {
 425                    channel_id: channel.id,
 426                    parent_id,
 427                }]
 428            } else {
 429                vec![]
 430            };
 431
 432            this.update(&mut cx, |this, cx| {
 433                let task = this.update_channels(
 434                    proto::UpdateChannels {
 435                        channels: vec![channel],
 436                        insert_edge: parent_edge,
 437                        channel_permissions: vec![ChannelPermission {
 438                            channel_id,
 439                            role: ChannelRole::Admin.into(),
 440                        }],
 441                        ..Default::default()
 442                    },
 443                    cx,
 444                );
 445                assert!(task.is_none());
 446
 447                // This event is emitted because the collab panel wants to clear the pending edit state
 448                // before this frame is rendered. But we can't guarantee that the collab panel's future
 449                // will resolve before this flush_effects finishes. Synchronously emitting this event
 450                // ensures that the collab panel will observe this creation before the frame completes
 451                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 452            });
 453
 454            Ok(channel_id)
 455        })
 456    }
 457
 458    pub fn link_channel(
 459        &mut self,
 460        channel_id: ChannelId,
 461        to: ChannelId,
 462        cx: &mut ModelContext<Self>,
 463    ) -> Task<Result<()>> {
 464        let client = self.client.clone();
 465        cx.spawn(|_, _| async move {
 466            let _ = client
 467                .request(proto::LinkChannel { channel_id, to })
 468                .await?;
 469
 470            Ok(())
 471        })
 472    }
 473
 474    pub fn unlink_channel(
 475        &mut self,
 476        channel_id: ChannelId,
 477        from: ChannelId,
 478        cx: &mut ModelContext<Self>,
 479    ) -> Task<Result<()>> {
 480        let client = self.client.clone();
 481        cx.spawn(|_, _| async move {
 482            let _ = client
 483                .request(proto::UnlinkChannel { channel_id, from })
 484                .await?;
 485
 486            Ok(())
 487        })
 488    }
 489
 490    pub fn move_channel(
 491        &mut self,
 492        channel_id: ChannelId,
 493        from: ChannelId,
 494        to: ChannelId,
 495        cx: &mut ModelContext<Self>,
 496    ) -> Task<Result<()>> {
 497        let client = self.client.clone();
 498        cx.spawn(|_, _| async move {
 499            let _ = client
 500                .request(proto::MoveChannel {
 501                    channel_id,
 502                    from,
 503                    to,
 504                })
 505                .await?;
 506
 507            Ok(())
 508        })
 509    }
 510
 511    pub fn invite_member(
 512        &mut self,
 513        channel_id: ChannelId,
 514        user_id: UserId,
 515        role: proto::ChannelRole,
 516        cx: &mut ModelContext<Self>,
 517    ) -> Task<Result<()>> {
 518        if !self.outgoing_invites.insert((channel_id, user_id)) {
 519            return Task::ready(Err(anyhow!("invite request already in progress")));
 520        }
 521
 522        cx.notify();
 523        let client = self.client.clone();
 524        cx.spawn(|this, mut cx| async move {
 525            let result = client
 526                .request(proto::InviteChannelMember {
 527                    channel_id,
 528                    user_id,
 529                    role: role.into(),
 530                })
 531                .await;
 532
 533            this.update(&mut cx, |this, cx| {
 534                this.outgoing_invites.remove(&(channel_id, user_id));
 535                cx.notify();
 536            });
 537
 538            result?;
 539
 540            Ok(())
 541        })
 542    }
 543
 544    pub fn remove_member(
 545        &mut self,
 546        channel_id: ChannelId,
 547        user_id: u64,
 548        cx: &mut ModelContext<Self>,
 549    ) -> Task<Result<()>> {
 550        if !self.outgoing_invites.insert((channel_id, user_id)) {
 551            return Task::ready(Err(anyhow!("invite request already in progress")));
 552        }
 553
 554        cx.notify();
 555        let client = self.client.clone();
 556        cx.spawn(|this, mut cx| async move {
 557            let result = client
 558                .request(proto::RemoveChannelMember {
 559                    channel_id,
 560                    user_id,
 561                })
 562                .await;
 563
 564            this.update(&mut cx, |this, cx| {
 565                this.outgoing_invites.remove(&(channel_id, user_id));
 566                cx.notify();
 567            });
 568            result?;
 569            Ok(())
 570        })
 571    }
 572
 573    pub fn set_member_role(
 574        &mut self,
 575        channel_id: ChannelId,
 576        user_id: UserId,
 577        role: proto::ChannelRole,
 578        cx: &mut ModelContext<Self>,
 579    ) -> Task<Result<()>> {
 580        if !self.outgoing_invites.insert((channel_id, user_id)) {
 581            return Task::ready(Err(anyhow!("member request already in progress")));
 582        }
 583
 584        cx.notify();
 585        let client = self.client.clone();
 586        cx.spawn(|this, mut cx| async move {
 587            let result = client
 588                .request(proto::SetChannelMemberRole {
 589                    channel_id,
 590                    user_id,
 591                    role: role.into(),
 592                })
 593                .await;
 594
 595            this.update(&mut cx, |this, cx| {
 596                this.outgoing_invites.remove(&(channel_id, user_id));
 597                cx.notify();
 598            });
 599
 600            result?;
 601            Ok(())
 602        })
 603    }
 604
 605    pub fn rename(
 606        &mut self,
 607        channel_id: ChannelId,
 608        new_name: &str,
 609        cx: &mut ModelContext<Self>,
 610    ) -> Task<Result<()>> {
 611        let client = self.client.clone();
 612        let name = new_name.to_string();
 613        cx.spawn(|this, mut cx| async move {
 614            let channel = client
 615                .request(proto::RenameChannel { channel_id, name })
 616                .await?
 617                .channel
 618                .ok_or_else(|| anyhow!("missing channel in response"))?;
 619            this.update(&mut cx, |this, cx| {
 620                let task = this.update_channels(
 621                    proto::UpdateChannels {
 622                        channels: vec![channel],
 623                        ..Default::default()
 624                    },
 625                    cx,
 626                );
 627                assert!(task.is_none());
 628
 629                // This event is emitted because the collab panel wants to clear the pending edit state
 630                // before this frame is rendered. But we can't guarantee that the collab panel's future
 631                // will resolve before this flush_effects finishes. Synchronously emitting this event
 632                // ensures that the collab panel will observe this creation before the frame complete
 633                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 634            });
 635            Ok(())
 636        })
 637    }
 638
 639    pub fn respond_to_channel_invite(
 640        &mut self,
 641        channel_id: ChannelId,
 642        accept: bool,
 643    ) -> impl Future<Output = Result<()>> {
 644        let client = self.client.clone();
 645        async move {
 646            client
 647                .request(proto::RespondToChannelInvite { channel_id, accept })
 648                .await?;
 649            Ok(())
 650        }
 651    }
 652
 653    pub fn get_channel_member_details(
 654        &self,
 655        channel_id: ChannelId,
 656        cx: &mut ModelContext<Self>,
 657    ) -> Task<Result<Vec<ChannelMembership>>> {
 658        let client = self.client.clone();
 659        let user_store = self.user_store.downgrade();
 660        cx.spawn(|_, mut cx| async move {
 661            let response = client
 662                .request(proto::GetChannelMembers { channel_id })
 663                .await?;
 664
 665            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 666            let user_store = user_store
 667                .upgrade(&cx)
 668                .ok_or_else(|| anyhow!("user store dropped"))?;
 669            let users = user_store
 670                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
 671                .await?;
 672
 673            Ok(users
 674                .into_iter()
 675                .zip(response.members)
 676                .filter_map(|(user, member)| {
 677                    Some(ChannelMembership {
 678                        user,
 679                        role: member.role(),
 680                        kind: member.kind(),
 681                    })
 682                })
 683                .collect())
 684        })
 685    }
 686
 687    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 688        let client = self.client.clone();
 689        async move {
 690            client.request(proto::DeleteChannel { channel_id }).await?;
 691            Ok(())
 692        }
 693    }
 694
 695    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 696        false
 697    }
 698
 699    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 700        self.outgoing_invites.contains(&(channel_id, user_id))
 701    }
 702
 703    async fn handle_update_channels(
 704        this: ModelHandle<Self>,
 705        message: TypedEnvelope<proto::UpdateChannels>,
 706        _: Arc<Client>,
 707        mut cx: AsyncAppContext,
 708    ) -> Result<()> {
 709        this.update(&mut cx, |this, _| {
 710            this.update_channels_tx
 711                .unbounded_send(message.payload)
 712                .unwrap();
 713        });
 714        Ok(())
 715    }
 716
 717    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 718        self.disconnect_channel_buffers_task.take();
 719
 720        for chat in self.opened_chats.values() {
 721            if let OpenedModelHandle::Open(chat) = chat {
 722                if let Some(chat) = chat.upgrade(cx) {
 723                    chat.update(cx, |chat, cx| {
 724                        chat.rejoin(cx);
 725                    });
 726                }
 727            }
 728        }
 729
 730        let mut buffer_versions = Vec::new();
 731        for buffer in self.opened_buffers.values() {
 732            if let OpenedModelHandle::Open(buffer) = buffer {
 733                if let Some(buffer) = buffer.upgrade(cx) {
 734                    let channel_buffer = buffer.read(cx);
 735                    let buffer = channel_buffer.buffer().read(cx);
 736                    buffer_versions.push(proto::ChannelBufferVersion {
 737                        channel_id: channel_buffer.channel().id,
 738                        epoch: channel_buffer.epoch(),
 739                        version: language::proto::serialize_version(&buffer.version()),
 740                    });
 741                }
 742            }
 743        }
 744
 745        if buffer_versions.is_empty() {
 746            return Task::ready(Ok(()));
 747        }
 748
 749        let response = self.client.request(proto::RejoinChannelBuffers {
 750            buffers: buffer_versions,
 751        });
 752
 753        cx.spawn(|this, mut cx| async move {
 754            let mut response = response.await?;
 755
 756            this.update(&mut cx, |this, cx| {
 757                this.opened_buffers.retain(|_, buffer| match buffer {
 758                    OpenedModelHandle::Open(channel_buffer) => {
 759                        let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
 760                            return false;
 761                        };
 762
 763                        channel_buffer.update(cx, |channel_buffer, cx| {
 764                            let channel_id = channel_buffer.channel().id;
 765                            if let Some(remote_buffer) = response
 766                                .buffers
 767                                .iter_mut()
 768                                .find(|buffer| buffer.channel_id == channel_id)
 769                            {
 770                                let channel_id = channel_buffer.channel().id;
 771                                let remote_version =
 772                                    language::proto::deserialize_version(&remote_buffer.version);
 773
 774                                channel_buffer.replace_collaborators(
 775                                    mem::take(&mut remote_buffer.collaborators),
 776                                    cx,
 777                                );
 778
 779                                let operations = channel_buffer
 780                                    .buffer()
 781                                    .update(cx, |buffer, cx| {
 782                                        let outgoing_operations =
 783                                            buffer.serialize_ops(Some(remote_version), cx);
 784                                        let incoming_operations =
 785                                            mem::take(&mut remote_buffer.operations)
 786                                                .into_iter()
 787                                                .map(language::proto::deserialize_operation)
 788                                                .collect::<Result<Vec<_>>>()?;
 789                                        buffer.apply_ops(incoming_operations, cx)?;
 790                                        anyhow::Ok(outgoing_operations)
 791                                    })
 792                                    .log_err();
 793
 794                                if let Some(operations) = operations {
 795                                    let client = this.client.clone();
 796                                    cx.background()
 797                                        .spawn(async move {
 798                                            let operations = operations.await;
 799                                            for chunk in
 800                                                language::proto::split_operations(operations)
 801                                            {
 802                                                client
 803                                                    .send(proto::UpdateChannelBuffer {
 804                                                        channel_id,
 805                                                        operations: chunk,
 806                                                    })
 807                                                    .ok();
 808                                            }
 809                                        })
 810                                        .detach();
 811                                    return true;
 812                                }
 813                            }
 814
 815                            channel_buffer.disconnect(cx);
 816                            false
 817                        })
 818                    }
 819                    OpenedModelHandle::Loading(_) => true,
 820                });
 821            });
 822            anyhow::Ok(())
 823        })
 824    }
 825
 826    fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
 827        self.channel_index.clear();
 828        self.channel_invitations.clear();
 829        self.channel_participants.clear();
 830        self.channels_with_admin_privileges.clear();
 831        self.channel_index.clear();
 832        self.outgoing_invites.clear();
 833        cx.notify();
 834
 835        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 836            cx.spawn_weak(|this, mut cx| async move {
 837                cx.background().timer(RECONNECT_TIMEOUT).await;
 838                if let Some(this) = this.upgrade(&cx) {
 839                    this.update(&mut cx, |this, cx| {
 840                        for (_, buffer) in this.opened_buffers.drain() {
 841                            if let OpenedModelHandle::Open(buffer) = buffer {
 842                                if let Some(buffer) = buffer.upgrade(cx) {
 843                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 844                                }
 845                            }
 846                        }
 847                    });
 848                }
 849            })
 850        });
 851    }
 852
 853    pub(crate) fn update_channels(
 854        &mut self,
 855        payload: proto::UpdateChannels,
 856        cx: &mut ModelContext<ChannelStore>,
 857    ) -> Option<Task<Result<()>>> {
 858        if !payload.remove_channel_invitations.is_empty() {
 859            self.channel_invitations
 860                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 861        }
 862        for channel in payload.channel_invitations {
 863            match self
 864                .channel_invitations
 865                .binary_search_by_key(&channel.id, |c| c.id)
 866            {
 867                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
 868                Err(ix) => self.channel_invitations.insert(
 869                    ix,
 870                    Arc::new(Channel {
 871                        id: channel.id,
 872                        name: channel.name,
 873                        unseen_note_version: None,
 874                        unseen_message_id: None,
 875                    }),
 876                ),
 877            }
 878        }
 879
 880        let channels_changed = !payload.channels.is_empty()
 881            || !payload.delete_channels.is_empty()
 882            || !payload.insert_edge.is_empty()
 883            || !payload.delete_edge.is_empty()
 884            || !payload.unseen_channel_messages.is_empty()
 885            || !payload.unseen_channel_buffer_changes.is_empty();
 886
 887        if channels_changed {
 888            if !payload.delete_channels.is_empty() {
 889                self.channel_index.delete_channels(&payload.delete_channels);
 890                self.channel_participants
 891                    .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
 892                self.channels_with_admin_privileges
 893                    .retain(|channel_id| !payload.delete_channels.contains(channel_id));
 894
 895                for channel_id in &payload.delete_channels {
 896                    let channel_id = *channel_id;
 897                    if let Some(OpenedModelHandle::Open(buffer)) =
 898                        self.opened_buffers.remove(&channel_id)
 899                    {
 900                        if let Some(buffer) = buffer.upgrade(cx) {
 901                            buffer.update(cx, ChannelBuffer::disconnect);
 902                        }
 903                    }
 904                }
 905            }
 906
 907            let mut index = self.channel_index.bulk_insert();
 908            for channel in payload.channels {
 909                index.insert(channel)
 910            }
 911
 912            for unseen_buffer_change in payload.unseen_channel_buffer_changes {
 913                let version = language::proto::deserialize_version(&unseen_buffer_change.version);
 914                index.note_changed(
 915                    unseen_buffer_change.channel_id,
 916                    unseen_buffer_change.epoch,
 917                    &version,
 918                );
 919            }
 920
 921            for unseen_channel_message in payload.unseen_channel_messages {
 922                index.new_messages(
 923                    unseen_channel_message.channel_id,
 924                    unseen_channel_message.message_id,
 925                );
 926            }
 927
 928            for edge in payload.insert_edge {
 929                index.insert_edge(edge.channel_id, edge.parent_id);
 930            }
 931
 932            for edge in payload.delete_edge {
 933                index.delete_edge(edge.parent_id, edge.channel_id);
 934            }
 935        }
 936
 937        for permission in payload.channel_permissions {
 938            if permission.role() == proto::ChannelRole::Admin {
 939                self.channels_with_admin_privileges
 940                    .insert(permission.channel_id);
 941            } else {
 942                self.channels_with_admin_privileges
 943                    .remove(&permission.channel_id);
 944            }
 945        }
 946
 947        cx.notify();
 948        if payload.channel_participants.is_empty() {
 949            return None;
 950        }
 951
 952        let mut all_user_ids = Vec::new();
 953        let channel_participants = payload.channel_participants;
 954        for entry in &channel_participants {
 955            for user_id in entry.participant_user_ids.iter() {
 956                if let Err(ix) = all_user_ids.binary_search(user_id) {
 957                    all_user_ids.insert(ix, *user_id);
 958                }
 959            }
 960        }
 961
 962        let users = self
 963            .user_store
 964            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
 965        Some(cx.spawn(|this, mut cx| async move {
 966            let users = users.await?;
 967
 968            this.update(&mut cx, |this, cx| {
 969                for entry in &channel_participants {
 970                    let mut participants: Vec<_> = entry
 971                        .participant_user_ids
 972                        .iter()
 973                        .filter_map(|user_id| {
 974                            users
 975                                .binary_search_by_key(&user_id, |user| &user.id)
 976                                .ok()
 977                                .map(|ix| users[ix].clone())
 978                        })
 979                        .collect();
 980
 981                    participants.sort_by_key(|u| u.id);
 982
 983                    this.channel_participants
 984                        .insert(entry.channel_id, participants);
 985                }
 986
 987                cx.notify();
 988            });
 989            anyhow::Ok(())
 990        }))
 991    }
 992}
 993
 994impl Deref for ChannelPath {
 995    type Target = [ChannelId];
 996
 997    fn deref(&self) -> &Self::Target {
 998        &self.0
 999    }
1000}
1001
1002impl ChannelPath {
1003    pub fn new(path: Arc<[ChannelId]>) -> Self {
1004        debug_assert!(path.len() >= 1);
1005        Self(path)
1006    }
1007
1008    pub fn parent_id(&self) -> Option<ChannelId> {
1009        self.0.len().checked_sub(2).map(|i| self.0[i])
1010    }
1011
1012    pub fn channel_id(&self) -> ChannelId {
1013        self.0[self.0.len() - 1]
1014    }
1015}
1016
1017impl From<ChannelPath> for Cow<'static, ChannelPath> {
1018    fn from(value: ChannelPath) -> Self {
1019        Cow::Owned(value)
1020    }
1021}
1022
1023impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
1024    fn from(value: &'a ChannelPath) -> Self {
1025        Cow::Borrowed(value)
1026    }
1027}
1028
1029impl Default for ChannelPath {
1030    fn default() -> Self {
1031        ChannelPath(Arc::from([]))
1032    }
1033}