channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use db::RELEASE_CHANNEL;
   9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  10use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  11use rpc::{
  12    proto::{self, ChannelEdge, ChannelPermission, ChannelRole, ChannelVisibility},
  13    TypedEnvelope,
  14};
  15use serde_derive::{Deserialize, Serialize};
  16use std::{borrow::Cow, hash::Hash, mem, ops::Deref, sync::Arc, time::Duration};
  17use util::ResultExt;
  18
  19pub fn init(client: &Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
  20    let channel_store =
  21        cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  22    cx.set_global(channel_store);
  23}
  24
  25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  26
  27pub type ChannelId = u64;
  28
  29pub struct ChannelStore {
  30    channel_index: ChannelIndex,
  31    channel_invitations: Vec<Arc<Channel>>,
  32    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  33    channels_with_admin_privileges: HashSet<ChannelId>,
  34    outgoing_invites: HashSet<(ChannelId, UserId)>,
  35    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  36    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  37    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  38    client: Arc<Client>,
  39    user_store: ModelHandle<UserStore>,
  40    _rpc_subscription: Subscription,
  41    _watch_connection_status: Task<Option<()>>,
  42    disconnect_channel_buffers_task: Option<Task<()>>,
  43    _update_channels: Task<()>,
  44}
  45
  46pub type ChannelData = (Channel, ChannelPath);
  47
  48#[derive(Clone, Debug, PartialEq)]
  49pub struct Channel {
  50    pub id: ChannelId,
  51    pub name: String,
  52    pub visibility: proto::ChannelVisibility,
  53    pub unseen_note_version: Option<(u64, clock::Global)>,
  54    pub unseen_message_id: Option<u64>,
  55}
  56
  57impl Channel {
  58    pub fn link(&self) -> String {
  59        RELEASE_CHANNEL.link_prefix().to_owned()
  60            + "channel/"
  61            + &self.slug()
  62            + "-"
  63            + &self.id.to_string()
  64    }
  65
  66    pub fn slug(&self) -> String {
  67        let slug: String = self
  68            .name
  69            .chars()
  70            .map(|c| if c.is_alphanumeric() { c } else { '-' })
  71            .collect();
  72
  73        slug.trim_matches(|c| c == '-').to_string()
  74    }
  75}
  76
  77#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  78pub struct ChannelPath(Arc<[ChannelId]>);
  79
  80pub struct ChannelMembership {
  81    pub user: Arc<User>,
  82    pub kind: proto::channel_member::Kind,
  83    pub role: proto::ChannelRole,
  84}
  85impl ChannelMembership {
  86    pub fn sort_key(&self) -> MembershipSortKey {
  87        MembershipSortKey {
  88            role_order: match self.role {
  89                proto::ChannelRole::Admin => 0,
  90                proto::ChannelRole::Member => 1,
  91                proto::ChannelRole::Banned => 2,
  92                proto::ChannelRole::Guest => 3,
  93            },
  94            kind_order: match self.kind {
  95                proto::channel_member::Kind::Member => 0,
  96                proto::channel_member::Kind::AncestorMember => 1,
  97                proto::channel_member::Kind::Invitee => 2,
  98            },
  99            username_order: self.user.github_login.as_str(),
 100        }
 101    }
 102}
 103
 104#[derive(PartialOrd, Ord, PartialEq, Eq)]
 105pub struct MembershipSortKey<'a> {
 106    role_order: u8,
 107    kind_order: u8,
 108    username_order: &'a str,
 109}
 110
 111pub enum ChannelEvent {
 112    ChannelCreated(ChannelId),
 113    ChannelRenamed(ChannelId),
 114}
 115
 116impl Entity for ChannelStore {
 117    type Event = ChannelEvent;
 118}
 119
 120enum OpenedModelHandle<E: Entity> {
 121    Open(WeakModelHandle<E>),
 122    Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
 123}
 124
 125impl ChannelStore {
 126    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
 127        cx.global::<ModelHandle<Self>>().clone()
 128    }
 129
 130    pub fn new(
 131        client: Arc<Client>,
 132        user_store: ModelHandle<UserStore>,
 133        cx: &mut ModelContext<Self>,
 134    ) -> Self {
 135        let rpc_subscription =
 136            client.add_message_handler(cx.handle(), Self::handle_update_channels);
 137
 138        let mut connection_status = client.status();
 139        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 140        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
 141            while let Some(status) = connection_status.next().await {
 142                let this = this.upgrade(&cx)?;
 143                match status {
 144                    client::Status::Connected { .. } => {
 145                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 146                            .await
 147                            .log_err()?;
 148                    }
 149                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 150                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx));
 151                    }
 152                    _ => {
 153                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx));
 154                    }
 155                }
 156                if status.is_connected() {
 157                } else {
 158                }
 159            }
 160            Some(())
 161        });
 162
 163        Self {
 164            channel_invitations: Vec::default(),
 165            channel_index: ChannelIndex::default(),
 166            channel_participants: Default::default(),
 167            channels_with_admin_privileges: Default::default(),
 168            outgoing_invites: Default::default(),
 169            opened_buffers: Default::default(),
 170            opened_chats: Default::default(),
 171            update_channels_tx,
 172            client,
 173            user_store,
 174            _rpc_subscription: rpc_subscription,
 175            _watch_connection_status: watch_connection_status,
 176            disconnect_channel_buffers_task: None,
 177            _update_channels: cx.spawn_weak(|this, mut cx| async move {
 178                while let Some(update_channels) = update_channels_rx.next().await {
 179                    if let Some(this) = this.upgrade(&cx) {
 180                        let update_task = this.update(&mut cx, |this, cx| {
 181                            this.update_channels(update_channels, cx)
 182                        });
 183                        if let Some(update_task) = update_task {
 184                            update_task.await.log_err();
 185                        }
 186                    }
 187                }
 188            }),
 189        }
 190    }
 191
 192    pub fn client(&self) -> Arc<Client> {
 193        self.client.clone()
 194    }
 195
 196    pub fn has_children(&self, channel_id: ChannelId) -> bool {
 197        self.channel_index.iter().any(|path| {
 198            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
 199                path.len() > ix + 1
 200            } else {
 201                false
 202            }
 203        })
 204    }
 205
 206    /// Returns the number of unique channels in the store
 207    pub fn channel_count(&self) -> usize {
 208        self.channel_index.by_id().len()
 209    }
 210
 211    /// Returns the index of a channel ID in the list of unique channels
 212    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 213        self.channel_index
 214            .by_id()
 215            .keys()
 216            .position(|id| *id == channel_id)
 217    }
 218
 219    /// Returns an iterator over all unique channels
 220    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 221        self.channel_index.by_id().values()
 222    }
 223
 224    /// Iterate over all entries in the channel DAG
 225    pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 226        self.channel_index.iter().map(move |path| {
 227            let id = path.last().unwrap();
 228            let channel = self.channel_for_id(*id).unwrap();
 229            (path.len() - 1, channel)
 230        })
 231    }
 232
 233    pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
 234        let path = self.channel_index.get(ix)?;
 235        let id = path.last().unwrap();
 236        let channel = self.channel_for_id(*id).unwrap();
 237
 238        Some((channel, path))
 239    }
 240
 241    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 242        self.channel_index.by_id().values().nth(ix)
 243    }
 244
 245    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 246        &self.channel_invitations
 247    }
 248
 249    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 250        self.channel_index.by_id().get(&channel_id)
 251    }
 252
 253    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
 254        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 255            if let OpenedModelHandle::Open(buffer) = buffer {
 256                return buffer.upgrade(cx).is_some();
 257            }
 258        }
 259        false
 260    }
 261
 262    pub fn open_channel_buffer(
 263        &mut self,
 264        channel_id: ChannelId,
 265        cx: &mut ModelContext<Self>,
 266    ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
 267        let client = self.client.clone();
 268        let user_store = self.user_store.clone();
 269        self.open_channel_resource(
 270            channel_id,
 271            |this| &mut this.opened_buffers,
 272            |channel, cx| ChannelBuffer::new(channel, client, user_store, cx),
 273            cx,
 274        )
 275    }
 276
 277    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
 278        self.channel_index
 279            .by_id()
 280            .get(&channel_id)
 281            .map(|channel| channel.unseen_note_version.is_some())
 282    }
 283
 284    pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
 285        self.channel_index
 286            .by_id()
 287            .get(&channel_id)
 288            .map(|channel| channel.unseen_message_id.is_some())
 289    }
 290
 291    pub fn notes_changed(
 292        &mut self,
 293        channel_id: ChannelId,
 294        epoch: u64,
 295        version: &clock::Global,
 296        cx: &mut ModelContext<Self>,
 297    ) {
 298        self.channel_index.note_changed(channel_id, epoch, version);
 299        cx.notify();
 300    }
 301
 302    pub fn new_message(
 303        &mut self,
 304        channel_id: ChannelId,
 305        message_id: u64,
 306        cx: &mut ModelContext<Self>,
 307    ) {
 308        self.channel_index.new_message(channel_id, message_id);
 309        cx.notify();
 310    }
 311
 312    pub fn acknowledge_message_id(
 313        &mut self,
 314        channel_id: ChannelId,
 315        message_id: u64,
 316        cx: &mut ModelContext<Self>,
 317    ) {
 318        self.channel_index
 319            .acknowledge_message_id(channel_id, message_id);
 320        cx.notify();
 321    }
 322
 323    pub fn acknowledge_notes_version(
 324        &mut self,
 325        channel_id: ChannelId,
 326        epoch: u64,
 327        version: &clock::Global,
 328        cx: &mut ModelContext<Self>,
 329    ) {
 330        self.channel_index
 331            .acknowledge_note_version(channel_id, epoch, version);
 332        cx.notify();
 333    }
 334
 335    pub fn open_channel_chat(
 336        &mut self,
 337        channel_id: ChannelId,
 338        cx: &mut ModelContext<Self>,
 339    ) -> Task<Result<ModelHandle<ChannelChat>>> {
 340        let client = self.client.clone();
 341        let user_store = self.user_store.clone();
 342        let this = cx.handle();
 343        self.open_channel_resource(
 344            channel_id,
 345            |this| &mut this.opened_chats,
 346            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 347            cx,
 348        )
 349    }
 350
 351    /// Asynchronously open a given resource associated with a channel.
 352    ///
 353    /// Make sure that the resource is only opened once, even if this method
 354    /// is called multiple times with the same channel id while the first task
 355    /// is still running.
 356    fn open_channel_resource<T: Entity, F, Fut>(
 357        &mut self,
 358        channel_id: ChannelId,
 359        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 360        load: F,
 361        cx: &mut ModelContext<Self>,
 362    ) -> Task<Result<ModelHandle<T>>>
 363    where
 364        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 365        Fut: Future<Output = Result<ModelHandle<T>>>,
 366    {
 367        let task = loop {
 368            match get_map(self).entry(channel_id) {
 369                hash_map::Entry::Occupied(e) => match e.get() {
 370                    OpenedModelHandle::Open(model) => {
 371                        if let Some(model) = model.upgrade(cx) {
 372                            break Task::ready(Ok(model)).shared();
 373                        } else {
 374                            get_map(self).remove(&channel_id);
 375                            continue;
 376                        }
 377                    }
 378                    OpenedModelHandle::Loading(task) => {
 379                        break task.clone();
 380                    }
 381                },
 382                hash_map::Entry::Vacant(e) => {
 383                    let task = cx
 384                        .spawn(|this, cx| async move {
 385                            let channel = this.read_with(&cx, |this, _| {
 386                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 387                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 388                                })
 389                            })?;
 390
 391                            load(channel, cx).await.map_err(Arc::new)
 392                        })
 393                        .shared();
 394
 395                    e.insert(OpenedModelHandle::Loading(task.clone()));
 396                    cx.spawn({
 397                        let task = task.clone();
 398                        |this, mut cx| async move {
 399                            let result = task.await;
 400                            this.update(&mut cx, |this, _| match result {
 401                                Ok(model) => {
 402                                    get_map(this).insert(
 403                                        channel_id,
 404                                        OpenedModelHandle::Open(model.downgrade()),
 405                                    );
 406                                }
 407                                Err(_) => {
 408                                    get_map(this).remove(&channel_id);
 409                                }
 410                            });
 411                        }
 412                    })
 413                    .detach();
 414                    break task;
 415                }
 416            }
 417        };
 418        cx.foreground()
 419            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 420    }
 421
 422    pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
 423        self.channel_index.iter().any(|path| {
 424            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
 425                path[..=ix]
 426                    .iter()
 427                    .any(|id| self.channels_with_admin_privileges.contains(id))
 428            } else {
 429                false
 430            }
 431        })
 432    }
 433
 434    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 435        self.channel_participants
 436            .get(&channel_id)
 437            .map_or(&[], |v| v.as_slice())
 438    }
 439
 440    pub fn create_channel(
 441        &self,
 442        name: &str,
 443        parent_id: Option<ChannelId>,
 444        cx: &mut ModelContext<Self>,
 445    ) -> Task<Result<ChannelId>> {
 446        let client = self.client.clone();
 447        let name = name.trim_start_matches("#").to_owned();
 448        cx.spawn(|this, mut cx| async move {
 449            let response = client
 450                .request(proto::CreateChannel { name, parent_id })
 451                .await?;
 452
 453            let channel = response
 454                .channel
 455                .ok_or_else(|| anyhow!("missing channel in response"))?;
 456            let channel_id = channel.id;
 457
 458            let parent_edge = if let Some(parent_id) = parent_id {
 459                vec![ChannelEdge {
 460                    channel_id: channel.id,
 461                    parent_id,
 462                }]
 463            } else {
 464                vec![]
 465            };
 466
 467            this.update(&mut cx, |this, cx| {
 468                let task = this.update_channels(
 469                    proto::UpdateChannels {
 470                        channels: vec![channel],
 471                        insert_edge: parent_edge,
 472                        channel_permissions: vec![ChannelPermission {
 473                            channel_id,
 474                            role: ChannelRole::Admin.into(),
 475                        }],
 476                        ..Default::default()
 477                    },
 478                    cx,
 479                );
 480                assert!(task.is_none());
 481
 482                // This event is emitted because the collab panel wants to clear the pending edit state
 483                // before this frame is rendered. But we can't guarantee that the collab panel's future
 484                // will resolve before this flush_effects finishes. Synchronously emitting this event
 485                // ensures that the collab panel will observe this creation before the frame completes
 486                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 487            });
 488
 489            Ok(channel_id)
 490        })
 491    }
 492
 493    pub fn link_channel(
 494        &mut self,
 495        channel_id: ChannelId,
 496        to: ChannelId,
 497        cx: &mut ModelContext<Self>,
 498    ) -> Task<Result<()>> {
 499        let client = self.client.clone();
 500        cx.spawn(|_, _| async move {
 501            let _ = client
 502                .request(proto::LinkChannel { channel_id, to })
 503                .await?;
 504
 505            Ok(())
 506        })
 507    }
 508
 509    pub fn unlink_channel(
 510        &mut self,
 511        channel_id: ChannelId,
 512        from: ChannelId,
 513        cx: &mut ModelContext<Self>,
 514    ) -> Task<Result<()>> {
 515        let client = self.client.clone();
 516        cx.spawn(|_, _| async move {
 517            let _ = client
 518                .request(proto::UnlinkChannel { channel_id, from })
 519                .await?;
 520
 521            Ok(())
 522        })
 523    }
 524
 525    pub fn move_channel(
 526        &mut self,
 527        channel_id: ChannelId,
 528        from: ChannelId,
 529        to: ChannelId,
 530        cx: &mut ModelContext<Self>,
 531    ) -> Task<Result<()>> {
 532        let client = self.client.clone();
 533        cx.spawn(|_, _| async move {
 534            let _ = client
 535                .request(proto::MoveChannel {
 536                    channel_id,
 537                    from,
 538                    to,
 539                })
 540                .await?;
 541
 542            Ok(())
 543        })
 544    }
 545
 546    pub fn set_channel_visibility(
 547        &mut self,
 548        channel_id: ChannelId,
 549        visibility: ChannelVisibility,
 550        cx: &mut ModelContext<Self>,
 551    ) -> Task<Result<()>> {
 552        let client = self.client.clone();
 553        cx.spawn(|_, _| async move {
 554            let _ = client
 555                .request(proto::SetChannelVisibility {
 556                    channel_id,
 557                    visibility: visibility.into(),
 558                })
 559                .await?;
 560
 561            Ok(())
 562        })
 563    }
 564
 565    pub fn invite_member(
 566        &mut self,
 567        channel_id: ChannelId,
 568        user_id: UserId,
 569        role: proto::ChannelRole,
 570        cx: &mut ModelContext<Self>,
 571    ) -> Task<Result<()>> {
 572        if !self.outgoing_invites.insert((channel_id, user_id)) {
 573            return Task::ready(Err(anyhow!("invite request already in progress")));
 574        }
 575
 576        cx.notify();
 577        let client = self.client.clone();
 578        cx.spawn(|this, mut cx| async move {
 579            let result = client
 580                .request(proto::InviteChannelMember {
 581                    channel_id,
 582                    user_id,
 583                    role: role.into(),
 584                })
 585                .await;
 586
 587            this.update(&mut cx, |this, cx| {
 588                this.outgoing_invites.remove(&(channel_id, user_id));
 589                cx.notify();
 590            });
 591
 592            result?;
 593
 594            Ok(())
 595        })
 596    }
 597
 598    pub fn remove_member(
 599        &mut self,
 600        channel_id: ChannelId,
 601        user_id: u64,
 602        cx: &mut ModelContext<Self>,
 603    ) -> Task<Result<()>> {
 604        if !self.outgoing_invites.insert((channel_id, user_id)) {
 605            return Task::ready(Err(anyhow!("invite request already in progress")));
 606        }
 607
 608        cx.notify();
 609        let client = self.client.clone();
 610        cx.spawn(|this, mut cx| async move {
 611            let result = client
 612                .request(proto::RemoveChannelMember {
 613                    channel_id,
 614                    user_id,
 615                })
 616                .await;
 617
 618            this.update(&mut cx, |this, cx| {
 619                this.outgoing_invites.remove(&(channel_id, user_id));
 620                cx.notify();
 621            });
 622            result?;
 623            Ok(())
 624        })
 625    }
 626
 627    pub fn set_member_role(
 628        &mut self,
 629        channel_id: ChannelId,
 630        user_id: UserId,
 631        role: proto::ChannelRole,
 632        cx: &mut ModelContext<Self>,
 633    ) -> Task<Result<()>> {
 634        if !self.outgoing_invites.insert((channel_id, user_id)) {
 635            return Task::ready(Err(anyhow!("member request already in progress")));
 636        }
 637
 638        cx.notify();
 639        let client = self.client.clone();
 640        cx.spawn(|this, mut cx| async move {
 641            let result = client
 642                .request(proto::SetChannelMemberRole {
 643                    channel_id,
 644                    user_id,
 645                    role: role.into(),
 646                })
 647                .await;
 648
 649            this.update(&mut cx, |this, cx| {
 650                this.outgoing_invites.remove(&(channel_id, user_id));
 651                cx.notify();
 652            });
 653
 654            result?;
 655            Ok(())
 656        })
 657    }
 658
 659    pub fn rename(
 660        &mut self,
 661        channel_id: ChannelId,
 662        new_name: &str,
 663        cx: &mut ModelContext<Self>,
 664    ) -> Task<Result<()>> {
 665        let client = self.client.clone();
 666        let name = new_name.to_string();
 667        cx.spawn(|this, mut cx| async move {
 668            let channel = client
 669                .request(proto::RenameChannel { channel_id, name })
 670                .await?
 671                .channel
 672                .ok_or_else(|| anyhow!("missing channel in response"))?;
 673            this.update(&mut cx, |this, cx| {
 674                let task = this.update_channels(
 675                    proto::UpdateChannels {
 676                        channels: vec![channel],
 677                        ..Default::default()
 678                    },
 679                    cx,
 680                );
 681                assert!(task.is_none());
 682
 683                // This event is emitted because the collab panel wants to clear the pending edit state
 684                // before this frame is rendered. But we can't guarantee that the collab panel's future
 685                // will resolve before this flush_effects finishes. Synchronously emitting this event
 686                // ensures that the collab panel will observe this creation before the frame complete
 687                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 688            });
 689            Ok(())
 690        })
 691    }
 692
 693    pub fn respond_to_channel_invite(
 694        &mut self,
 695        channel_id: ChannelId,
 696        accept: bool,
 697    ) -> impl Future<Output = Result<()>> {
 698        let client = self.client.clone();
 699        async move {
 700            client
 701                .request(proto::RespondToChannelInvite { channel_id, accept })
 702                .await?;
 703            Ok(())
 704        }
 705    }
 706
 707    pub fn get_channel_member_details(
 708        &self,
 709        channel_id: ChannelId,
 710        cx: &mut ModelContext<Self>,
 711    ) -> Task<Result<Vec<ChannelMembership>>> {
 712        let client = self.client.clone();
 713        let user_store = self.user_store.downgrade();
 714        cx.spawn(|_, mut cx| async move {
 715            let response = client
 716                .request(proto::GetChannelMembers { channel_id })
 717                .await?;
 718
 719            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 720            let user_store = user_store
 721                .upgrade(&cx)
 722                .ok_or_else(|| anyhow!("user store dropped"))?;
 723            let users = user_store
 724                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
 725                .await?;
 726
 727            Ok(users
 728                .into_iter()
 729                .zip(response.members)
 730                .filter_map(|(user, member)| {
 731                    Some(ChannelMembership {
 732                        user,
 733                        role: member.role(),
 734                        kind: member.kind(),
 735                    })
 736                })
 737                .collect())
 738        })
 739    }
 740
 741    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 742        let client = self.client.clone();
 743        async move {
 744            client.request(proto::DeleteChannel { channel_id }).await?;
 745            Ok(())
 746        }
 747    }
 748
 749    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 750        false
 751    }
 752
 753    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 754        self.outgoing_invites.contains(&(channel_id, user_id))
 755    }
 756
 757    async fn handle_update_channels(
 758        this: ModelHandle<Self>,
 759        message: TypedEnvelope<proto::UpdateChannels>,
 760        _: Arc<Client>,
 761        mut cx: AsyncAppContext,
 762    ) -> Result<()> {
 763        this.update(&mut cx, |this, _| {
 764            this.update_channels_tx
 765                .unbounded_send(message.payload)
 766                .unwrap();
 767        });
 768        Ok(())
 769    }
 770
 771    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 772        self.disconnect_channel_buffers_task.take();
 773
 774        for chat in self.opened_chats.values() {
 775            if let OpenedModelHandle::Open(chat) = chat {
 776                if let Some(chat) = chat.upgrade(cx) {
 777                    chat.update(cx, |chat, cx| {
 778                        chat.rejoin(cx);
 779                    });
 780                }
 781            }
 782        }
 783
 784        let mut buffer_versions = Vec::new();
 785        for buffer in self.opened_buffers.values() {
 786            if let OpenedModelHandle::Open(buffer) = buffer {
 787                if let Some(buffer) = buffer.upgrade(cx) {
 788                    let channel_buffer = buffer.read(cx);
 789                    let buffer = channel_buffer.buffer().read(cx);
 790                    buffer_versions.push(proto::ChannelBufferVersion {
 791                        channel_id: channel_buffer.channel().id,
 792                        epoch: channel_buffer.epoch(),
 793                        version: language::proto::serialize_version(&buffer.version()),
 794                    });
 795                }
 796            }
 797        }
 798
 799        if buffer_versions.is_empty() {
 800            return Task::ready(Ok(()));
 801        }
 802
 803        let response = self.client.request(proto::RejoinChannelBuffers {
 804            buffers: buffer_versions,
 805        });
 806
 807        cx.spawn(|this, mut cx| async move {
 808            let mut response = response.await?;
 809
 810            this.update(&mut cx, |this, cx| {
 811                this.opened_buffers.retain(|_, buffer| match buffer {
 812                    OpenedModelHandle::Open(channel_buffer) => {
 813                        let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
 814                            return false;
 815                        };
 816
 817                        channel_buffer.update(cx, |channel_buffer, cx| {
 818                            let channel_id = channel_buffer.channel().id;
 819                            if let Some(remote_buffer) = response
 820                                .buffers
 821                                .iter_mut()
 822                                .find(|buffer| buffer.channel_id == channel_id)
 823                            {
 824                                let channel_id = channel_buffer.channel().id;
 825                                let remote_version =
 826                                    language::proto::deserialize_version(&remote_buffer.version);
 827
 828                                channel_buffer.replace_collaborators(
 829                                    mem::take(&mut remote_buffer.collaborators),
 830                                    cx,
 831                                );
 832
 833                                let operations = channel_buffer
 834                                    .buffer()
 835                                    .update(cx, |buffer, cx| {
 836                                        let outgoing_operations =
 837                                            buffer.serialize_ops(Some(remote_version), cx);
 838                                        let incoming_operations =
 839                                            mem::take(&mut remote_buffer.operations)
 840                                                .into_iter()
 841                                                .map(language::proto::deserialize_operation)
 842                                                .collect::<Result<Vec<_>>>()?;
 843                                        buffer.apply_ops(incoming_operations, cx)?;
 844                                        anyhow::Ok(outgoing_operations)
 845                                    })
 846                                    .log_err();
 847
 848                                if let Some(operations) = operations {
 849                                    let client = this.client.clone();
 850                                    cx.background()
 851                                        .spawn(async move {
 852                                            let operations = operations.await;
 853                                            for chunk in
 854                                                language::proto::split_operations(operations)
 855                                            {
 856                                                client
 857                                                    .send(proto::UpdateChannelBuffer {
 858                                                        channel_id,
 859                                                        operations: chunk,
 860                                                    })
 861                                                    .ok();
 862                                            }
 863                                        })
 864                                        .detach();
 865                                    return true;
 866                                }
 867                            }
 868
 869                            channel_buffer.disconnect(cx);
 870                            false
 871                        })
 872                    }
 873                    OpenedModelHandle::Loading(_) => true,
 874                });
 875            });
 876            anyhow::Ok(())
 877        })
 878    }
 879
 880    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
 881        self.channel_index.clear();
 882        self.channel_invitations.clear();
 883        self.channel_participants.clear();
 884        self.channels_with_admin_privileges.clear();
 885        self.channel_index.clear();
 886        self.outgoing_invites.clear();
 887        cx.notify();
 888
 889        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 890            cx.spawn_weak(|this, mut cx| async move {
 891                if wait_for_reconnect {
 892                    cx.background().timer(RECONNECT_TIMEOUT).await;
 893                }
 894
 895                if let Some(this) = this.upgrade(&cx) {
 896                    this.update(&mut cx, |this, cx| {
 897                        for (_, buffer) in this.opened_buffers.drain() {
 898                            if let OpenedModelHandle::Open(buffer) = buffer {
 899                                if let Some(buffer) = buffer.upgrade(cx) {
 900                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 901                                }
 902                            }
 903                        }
 904                    });
 905                }
 906            })
 907        });
 908    }
 909
 910    pub(crate) fn update_channels(
 911        &mut self,
 912        payload: proto::UpdateChannels,
 913        cx: &mut ModelContext<ChannelStore>,
 914    ) -> Option<Task<Result<()>>> {
 915        if !payload.remove_channel_invitations.is_empty() {
 916            self.channel_invitations
 917                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 918        }
 919        for channel in payload.channel_invitations {
 920            match self
 921                .channel_invitations
 922                .binary_search_by_key(&channel.id, |c| c.id)
 923            {
 924                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
 925                Err(ix) => self.channel_invitations.insert(
 926                    ix,
 927                    Arc::new(Channel {
 928                        id: channel.id,
 929                        visibility: channel.visibility(),
 930                        name: channel.name,
 931                        unseen_note_version: None,
 932                        unseen_message_id: None,
 933                    }),
 934                ),
 935            }
 936        }
 937
 938        let channels_changed = !payload.channels.is_empty()
 939            || !payload.delete_channels.is_empty()
 940            || !payload.insert_edge.is_empty()
 941            || !payload.delete_edge.is_empty()
 942            || !payload.unseen_channel_messages.is_empty()
 943            || !payload.unseen_channel_buffer_changes.is_empty();
 944
 945        if channels_changed {
 946            if !payload.delete_channels.is_empty() {
 947                self.channel_index.delete_channels(&payload.delete_channels);
 948                self.channel_participants
 949                    .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
 950                self.channels_with_admin_privileges
 951                    .retain(|channel_id| !payload.delete_channels.contains(channel_id));
 952
 953                for channel_id in &payload.delete_channels {
 954                    let channel_id = *channel_id;
 955                    if let Some(OpenedModelHandle::Open(buffer)) =
 956                        self.opened_buffers.remove(&channel_id)
 957                    {
 958                        if let Some(buffer) = buffer.upgrade(cx) {
 959                            buffer.update(cx, ChannelBuffer::disconnect);
 960                        }
 961                    }
 962                }
 963            }
 964
 965            let mut index = self.channel_index.bulk_insert();
 966            for channel in payload.channels {
 967                index.insert(channel)
 968            }
 969
 970            for unseen_buffer_change in payload.unseen_channel_buffer_changes {
 971                let version = language::proto::deserialize_version(&unseen_buffer_change.version);
 972                index.note_changed(
 973                    unseen_buffer_change.channel_id,
 974                    unseen_buffer_change.epoch,
 975                    &version,
 976                );
 977            }
 978
 979            for unseen_channel_message in payload.unseen_channel_messages {
 980                index.new_messages(
 981                    unseen_channel_message.channel_id,
 982                    unseen_channel_message.message_id,
 983                );
 984            }
 985
 986            for edge in payload.insert_edge {
 987                index.insert_edge(edge.channel_id, edge.parent_id);
 988            }
 989
 990            for edge in payload.delete_edge {
 991                index.delete_edge(edge.parent_id, edge.channel_id);
 992            }
 993        }
 994
 995        for permission in payload.channel_permissions {
 996            if permission.role() == proto::ChannelRole::Admin {
 997                self.channels_with_admin_privileges
 998                    .insert(permission.channel_id);
 999            } else {
1000                self.channels_with_admin_privileges
1001                    .remove(&permission.channel_id);
1002            }
1003        }
1004
1005        cx.notify();
1006        if payload.channel_participants.is_empty() {
1007            return None;
1008        }
1009
1010        let mut all_user_ids = Vec::new();
1011        let channel_participants = payload.channel_participants;
1012        for entry in &channel_participants {
1013            for user_id in entry.participant_user_ids.iter() {
1014                if let Err(ix) = all_user_ids.binary_search(user_id) {
1015                    all_user_ids.insert(ix, *user_id);
1016                }
1017            }
1018        }
1019
1020        let users = self
1021            .user_store
1022            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1023        Some(cx.spawn(|this, mut cx| async move {
1024            let users = users.await?;
1025
1026            this.update(&mut cx, |this, cx| {
1027                for entry in &channel_participants {
1028                    let mut participants: Vec<_> = entry
1029                        .participant_user_ids
1030                        .iter()
1031                        .filter_map(|user_id| {
1032                            users
1033                                .binary_search_by_key(&user_id, |user| &user.id)
1034                                .ok()
1035                                .map(|ix| users[ix].clone())
1036                        })
1037                        .collect();
1038
1039                    participants.sort_by_key(|u| u.id);
1040
1041                    this.channel_participants
1042                        .insert(entry.channel_id, participants);
1043                }
1044
1045                cx.notify();
1046            });
1047            anyhow::Ok(())
1048        }))
1049    }
1050}
1051
1052impl Deref for ChannelPath {
1053    type Target = [ChannelId];
1054
1055    fn deref(&self) -> &Self::Target {
1056        &self.0
1057    }
1058}
1059
1060impl ChannelPath {
1061    pub fn new(path: Arc<[ChannelId]>) -> Self {
1062        debug_assert!(path.len() >= 1);
1063        Self(path)
1064    }
1065
1066    pub fn parent_id(&self) -> Option<ChannelId> {
1067        self.0.len().checked_sub(2).map(|i| self.0[i])
1068    }
1069
1070    pub fn channel_id(&self) -> ChannelId {
1071        self.0[self.0.len() - 1]
1072    }
1073}
1074
1075impl From<ChannelPath> for Cow<'static, ChannelPath> {
1076    fn from(value: ChannelPath) -> Self {
1077        Cow::Owned(value)
1078    }
1079}
1080
1081impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
1082    fn from(value: &'a ChannelPath) -> Self {
1083        Cow::Borrowed(value)
1084    }
1085}
1086
1087impl Default for ChannelPath {
1088    fn default() -> Self {
1089        ChannelPath(Arc::from([]))
1090    }
1091}