channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
   4use anyhow::{anyhow, Result};
   5use client::{Client, Subscription, User, UserId, UserStore};
   6use collections::{hash_map, HashMap, HashSet};
   7use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   8use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
   9use rpc::{
  10    proto::{self, ChannelEdge, ChannelPermission},
  11    TypedEnvelope,
  12};
  13use serde_derive::{Deserialize, Serialize};
  14use std::{borrow::Cow, hash::Hash, mem, ops::Deref, sync::Arc, time::Duration};
  15use util::ResultExt;
  16
  17use self::channel_index::ChannelIndex;
  18
  19pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  20
  21pub type ChannelId = u64;
  22
  23pub struct ChannelStore {
  24    channel_index: ChannelIndex,
  25    channel_invitations: Vec<Arc<Channel>>,
  26    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  27    channels_with_admin_privileges: HashSet<ChannelId>,
  28    outgoing_invites: HashSet<(ChannelId, UserId)>,
  29    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  30    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  31    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  32    client: Arc<Client>,
  33    user_store: ModelHandle<UserStore>,
  34    _rpc_subscription: Subscription,
  35    _watch_connection_status: Task<Option<()>>,
  36    disconnect_channel_buffers_task: Option<Task<()>>,
  37    _update_channels: Task<()>,
  38}
  39
  40pub type ChannelData = (Channel, ChannelPath);
  41
  42#[derive(Clone, Debug, PartialEq)]
  43pub struct Channel {
  44    pub id: ChannelId,
  45    pub name: String,
  46    pub unseen_note_version: Option<(u64, clock::Global)>,
  47    pub unseen_message_id: Option<u64>,
  48}
  49
  50#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  51pub struct ChannelPath(Arc<[ChannelId]>);
  52
  53pub struct ChannelMembership {
  54    pub user: Arc<User>,
  55    pub kind: proto::channel_member::Kind,
  56    pub admin: bool,
  57}
  58
  59pub enum ChannelEvent {
  60    ChannelCreated(ChannelId),
  61    ChannelRenamed(ChannelId),
  62}
  63
  64impl Entity for ChannelStore {
  65    type Event = ChannelEvent;
  66}
  67
  68enum OpenedModelHandle<E: Entity> {
  69    Open(WeakModelHandle<E>),
  70    Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
  71}
  72
  73impl ChannelStore {
  74    pub fn new(
  75        client: Arc<Client>,
  76        user_store: ModelHandle<UserStore>,
  77        cx: &mut ModelContext<Self>,
  78    ) -> Self {
  79        let rpc_subscription =
  80            client.add_message_handler(cx.handle(), Self::handle_update_channels);
  81
  82        let mut connection_status = client.status();
  83        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
  84        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
  85            while let Some(status) = connection_status.next().await {
  86                let this = this.upgrade(&cx)?;
  87                if status.is_connected() {
  88                    this.update(&mut cx, |this, cx| this.handle_connect(cx))
  89                        .await
  90                        .log_err()?;
  91                } else {
  92                    this.update(&mut cx, |this, cx| this.handle_disconnect(cx));
  93                }
  94            }
  95            Some(())
  96        });
  97
  98        Self {
  99            channel_invitations: Vec::default(),
 100            channel_index: ChannelIndex::default(),
 101            channel_participants: Default::default(),
 102            channels_with_admin_privileges: Default::default(),
 103            outgoing_invites: Default::default(),
 104            opened_buffers: Default::default(),
 105            opened_chats: Default::default(),
 106            update_channels_tx,
 107            client,
 108            user_store,
 109            _rpc_subscription: rpc_subscription,
 110            _watch_connection_status: watch_connection_status,
 111            disconnect_channel_buffers_task: None,
 112            _update_channels: cx.spawn_weak(|this, mut cx| async move {
 113                while let Some(update_channels) = update_channels_rx.next().await {
 114                    if let Some(this) = this.upgrade(&cx) {
 115                        let update_task = this.update(&mut cx, |this, cx| {
 116                            this.update_channels(update_channels, cx)
 117                        });
 118                        if let Some(update_task) = update_task {
 119                            update_task.await.log_err();
 120                        }
 121                    }
 122                }
 123            }),
 124        }
 125    }
 126
 127    pub fn client(&self) -> Arc<Client> {
 128        self.client.clone()
 129    }
 130
 131    pub fn has_children(&self, channel_id: ChannelId) -> bool {
 132        self.channel_index.iter().any(|path| {
 133            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
 134                path.len() > ix + 1
 135            } else {
 136                false
 137            }
 138        })
 139    }
 140
 141    /// Returns the number of unique channels in the store
 142    pub fn channel_count(&self) -> usize {
 143        self.channel_index.by_id().len()
 144    }
 145
 146    /// Returns the index of a channel ID in the list of unique channels
 147    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 148        self.channel_index
 149            .by_id()
 150            .keys()
 151            .position(|id| *id == channel_id)
 152    }
 153
 154    /// Returns an iterator over all unique channels
 155    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 156        self.channel_index.by_id().values()
 157    }
 158
 159    /// Iterate over all entries in the channel DAG
 160    pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 161        self.channel_index.iter().map(move |path| {
 162            let id = path.last().unwrap();
 163            let channel = self.channel_for_id(*id).unwrap();
 164            (path.len() - 1, channel)
 165        })
 166    }
 167
 168    pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
 169        let path = self.channel_index.get(ix)?;
 170        let id = path.last().unwrap();
 171        let channel = self.channel_for_id(*id).unwrap();
 172
 173        Some((channel, path))
 174    }
 175
 176    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 177        self.channel_index.by_id().values().nth(ix)
 178    }
 179
 180    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 181        &self.channel_invitations
 182    }
 183
 184    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 185        self.channel_index.by_id().get(&channel_id)
 186    }
 187
 188    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
 189        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 190            if let OpenedModelHandle::Open(buffer) = buffer {
 191                return buffer.upgrade(cx).is_some();
 192            }
 193        }
 194        false
 195    }
 196
 197    pub fn open_channel_buffer(
 198        &mut self,
 199        channel_id: ChannelId,
 200        cx: &mut ModelContext<Self>,
 201    ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
 202        let client = self.client.clone();
 203        let user_store = self.user_store.clone();
 204        self.open_channel_resource(
 205            channel_id,
 206            |this| &mut this.opened_buffers,
 207            |channel, cx| ChannelBuffer::new(channel, client, user_store, cx),
 208            cx,
 209        )
 210    }
 211
 212    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
 213        self.channel_index
 214            .by_id()
 215            .get(&channel_id)
 216            .map(|channel| channel.unseen_note_version.is_some())
 217    }
 218
 219    pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
 220        self.channel_index
 221            .by_id()
 222            .get(&channel_id)
 223            .map(|channel| channel.unseen_message_id.is_some())
 224    }
 225
 226    pub fn notes_changed(
 227        &mut self,
 228        channel_id: ChannelId,
 229        epoch: u64,
 230        version: &clock::Global,
 231        cx: &mut ModelContext<Self>,
 232    ) {
 233        self.channel_index.note_changed(channel_id, epoch, version);
 234        cx.notify();
 235    }
 236
 237    pub fn new_message(
 238        &mut self,
 239        channel_id: ChannelId,
 240        message_id: u64,
 241        cx: &mut ModelContext<Self>,
 242    ) {
 243        self.channel_index.new_message(channel_id, message_id);
 244        cx.notify();
 245    }
 246
 247    pub fn acknowledge_message_id(
 248        &mut self,
 249        channel_id: ChannelId,
 250        message_id: u64,
 251        cx: &mut ModelContext<Self>,
 252    ) {
 253        self.channel_index
 254            .acknowledge_message_id(channel_id, message_id);
 255        cx.notify();
 256    }
 257
 258    pub fn acknowledge_notes_version(
 259        &mut self,
 260        channel_id: ChannelId,
 261        epoch: u64,
 262        version: &clock::Global,
 263        cx: &mut ModelContext<Self>,
 264    ) {
 265        self.channel_index
 266            .acknowledge_note_version(channel_id, epoch, version);
 267        cx.notify();
 268    }
 269
 270    pub fn open_channel_chat(
 271        &mut self,
 272        channel_id: ChannelId,
 273        cx: &mut ModelContext<Self>,
 274    ) -> Task<Result<ModelHandle<ChannelChat>>> {
 275        let client = self.client.clone();
 276        let user_store = self.user_store.clone();
 277        let this = cx.handle();
 278        self.open_channel_resource(
 279            channel_id,
 280            |this| &mut this.opened_chats,
 281            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 282            cx,
 283        )
 284    }
 285
 286    /// Asynchronously open a given resource associated with a channel.
 287    ///
 288    /// Make sure that the resource is only opened once, even if this method
 289    /// is called multiple times with the same channel id while the first task
 290    /// is still running.
 291    fn open_channel_resource<T: Entity, F, Fut>(
 292        &mut self,
 293        channel_id: ChannelId,
 294        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 295        load: F,
 296        cx: &mut ModelContext<Self>,
 297    ) -> Task<Result<ModelHandle<T>>>
 298    where
 299        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 300        Fut: Future<Output = Result<ModelHandle<T>>>,
 301    {
 302        let task = loop {
 303            match get_map(self).entry(channel_id) {
 304                hash_map::Entry::Occupied(e) => match e.get() {
 305                    OpenedModelHandle::Open(model) => {
 306                        if let Some(model) = model.upgrade(cx) {
 307                            break Task::ready(Ok(model)).shared();
 308                        } else {
 309                            get_map(self).remove(&channel_id);
 310                            continue;
 311                        }
 312                    }
 313                    OpenedModelHandle::Loading(task) => {
 314                        break task.clone();
 315                    }
 316                },
 317                hash_map::Entry::Vacant(e) => {
 318                    let task = cx
 319                        .spawn(|this, cx| async move {
 320                            let channel = this.read_with(&cx, |this, _| {
 321                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 322                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 323                                })
 324                            })?;
 325
 326                            load(channel, cx).await.map_err(Arc::new)
 327                        })
 328                        .shared();
 329
 330                    e.insert(OpenedModelHandle::Loading(task.clone()));
 331                    cx.spawn({
 332                        let task = task.clone();
 333                        |this, mut cx| async move {
 334                            let result = task.await;
 335                            this.update(&mut cx, |this, _| match result {
 336                                Ok(model) => {
 337                                    get_map(this).insert(
 338                                        channel_id,
 339                                        OpenedModelHandle::Open(model.downgrade()),
 340                                    );
 341                                }
 342                                Err(_) => {
 343                                    get_map(this).remove(&channel_id);
 344                                }
 345                            });
 346                        }
 347                    })
 348                    .detach();
 349                    break task;
 350                }
 351            }
 352        };
 353        cx.foreground()
 354            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 355    }
 356
 357    pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
 358        self.channel_index.iter().any(|path| {
 359            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
 360                path[..=ix]
 361                    .iter()
 362                    .any(|id| self.channels_with_admin_privileges.contains(id))
 363            } else {
 364                false
 365            }
 366        })
 367    }
 368
 369    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 370        self.channel_participants
 371            .get(&channel_id)
 372            .map_or(&[], |v| v.as_slice())
 373    }
 374
 375    pub fn create_channel(
 376        &self,
 377        name: &str,
 378        parent_id: Option<ChannelId>,
 379        cx: &mut ModelContext<Self>,
 380    ) -> Task<Result<ChannelId>> {
 381        let client = self.client.clone();
 382        let name = name.trim_start_matches("#").to_owned();
 383        cx.spawn(|this, mut cx| async move {
 384            let response = client
 385                .request(proto::CreateChannel { name, parent_id })
 386                .await?;
 387
 388            let channel = response
 389                .channel
 390                .ok_or_else(|| anyhow!("missing channel in response"))?;
 391            let channel_id = channel.id;
 392
 393            let parent_edge = if let Some(parent_id) = parent_id {
 394                vec![ChannelEdge {
 395                    channel_id: channel.id,
 396                    parent_id,
 397                }]
 398            } else {
 399                vec![]
 400            };
 401
 402            this.update(&mut cx, |this, cx| {
 403                let task = this.update_channels(
 404                    proto::UpdateChannels {
 405                        channels: vec![channel],
 406                        insert_edge: parent_edge,
 407                        channel_permissions: vec![ChannelPermission {
 408                            channel_id,
 409                            is_admin: true,
 410                        }],
 411                        ..Default::default()
 412                    },
 413                    cx,
 414                );
 415                assert!(task.is_none());
 416
 417                // This event is emitted because the collab panel wants to clear the pending edit state
 418                // before this frame is rendered. But we can't guarantee that the collab panel's future
 419                // will resolve before this flush_effects finishes. Synchronously emitting this event
 420                // ensures that the collab panel will observe this creation before the frame completes
 421                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 422            });
 423
 424            Ok(channel_id)
 425        })
 426    }
 427
 428    pub fn link_channel(
 429        &mut self,
 430        channel_id: ChannelId,
 431        to: ChannelId,
 432        cx: &mut ModelContext<Self>,
 433    ) -> Task<Result<()>> {
 434        let client = self.client.clone();
 435        cx.spawn(|_, _| async move {
 436            let _ = client
 437                .request(proto::LinkChannel { channel_id, to })
 438                .await?;
 439
 440            Ok(())
 441        })
 442    }
 443
 444    pub fn unlink_channel(
 445        &mut self,
 446        channel_id: ChannelId,
 447        from: ChannelId,
 448        cx: &mut ModelContext<Self>,
 449    ) -> Task<Result<()>> {
 450        let client = self.client.clone();
 451        cx.spawn(|_, _| async move {
 452            let _ = client
 453                .request(proto::UnlinkChannel { channel_id, from })
 454                .await?;
 455
 456            Ok(())
 457        })
 458    }
 459
 460    pub fn move_channel(
 461        &mut self,
 462        channel_id: ChannelId,
 463        from: ChannelId,
 464        to: ChannelId,
 465        cx: &mut ModelContext<Self>,
 466    ) -> Task<Result<()>> {
 467        let client = self.client.clone();
 468        cx.spawn(|_, _| async move {
 469            let _ = client
 470                .request(proto::MoveChannel {
 471                    channel_id,
 472                    from,
 473                    to,
 474                })
 475                .await?;
 476
 477            Ok(())
 478        })
 479    }
 480
 481    pub fn invite_member(
 482        &mut self,
 483        channel_id: ChannelId,
 484        user_id: UserId,
 485        admin: bool,
 486        cx: &mut ModelContext<Self>,
 487    ) -> Task<Result<()>> {
 488        if !self.outgoing_invites.insert((channel_id, user_id)) {
 489            return Task::ready(Err(anyhow!("invite request already in progress")));
 490        }
 491
 492        cx.notify();
 493        let client = self.client.clone();
 494        cx.spawn(|this, mut cx| async move {
 495            let result = client
 496                .request(proto::InviteChannelMember {
 497                    channel_id,
 498                    user_id,
 499                    admin,
 500                })
 501                .await;
 502
 503            this.update(&mut cx, |this, cx| {
 504                this.outgoing_invites.remove(&(channel_id, user_id));
 505                cx.notify();
 506            });
 507
 508            result?;
 509
 510            Ok(())
 511        })
 512    }
 513
 514    pub fn remove_member(
 515        &mut self,
 516        channel_id: ChannelId,
 517        user_id: u64,
 518        cx: &mut ModelContext<Self>,
 519    ) -> Task<Result<()>> {
 520        if !self.outgoing_invites.insert((channel_id, user_id)) {
 521            return Task::ready(Err(anyhow!("invite request already in progress")));
 522        }
 523
 524        cx.notify();
 525        let client = self.client.clone();
 526        cx.spawn(|this, mut cx| async move {
 527            let result = client
 528                .request(proto::RemoveChannelMember {
 529                    channel_id,
 530                    user_id,
 531                })
 532                .await;
 533
 534            this.update(&mut cx, |this, cx| {
 535                this.outgoing_invites.remove(&(channel_id, user_id));
 536                cx.notify();
 537            });
 538            result?;
 539            Ok(())
 540        })
 541    }
 542
 543    pub fn set_member_admin(
 544        &mut self,
 545        channel_id: ChannelId,
 546        user_id: UserId,
 547        admin: bool,
 548        cx: &mut ModelContext<Self>,
 549    ) -> Task<Result<()>> {
 550        if !self.outgoing_invites.insert((channel_id, user_id)) {
 551            return Task::ready(Err(anyhow!("member request already in progress")));
 552        }
 553
 554        cx.notify();
 555        let client = self.client.clone();
 556        cx.spawn(|this, mut cx| async move {
 557            let result = client
 558                .request(proto::SetChannelMemberAdmin {
 559                    channel_id,
 560                    user_id,
 561                    admin,
 562                })
 563                .await;
 564
 565            this.update(&mut cx, |this, cx| {
 566                this.outgoing_invites.remove(&(channel_id, user_id));
 567                cx.notify();
 568            });
 569
 570            result?;
 571            Ok(())
 572        })
 573    }
 574
 575    pub fn rename(
 576        &mut self,
 577        channel_id: ChannelId,
 578        new_name: &str,
 579        cx: &mut ModelContext<Self>,
 580    ) -> Task<Result<()>> {
 581        let client = self.client.clone();
 582        let name = new_name.to_string();
 583        cx.spawn(|this, mut cx| async move {
 584            let channel = client
 585                .request(proto::RenameChannel { channel_id, name })
 586                .await?
 587                .channel
 588                .ok_or_else(|| anyhow!("missing channel in response"))?;
 589            this.update(&mut cx, |this, cx| {
 590                let task = this.update_channels(
 591                    proto::UpdateChannels {
 592                        channels: vec![channel],
 593                        ..Default::default()
 594                    },
 595                    cx,
 596                );
 597                assert!(task.is_none());
 598
 599                // This event is emitted because the collab panel wants to clear the pending edit state
 600                // before this frame is rendered. But we can't guarantee that the collab panel's future
 601                // will resolve before this flush_effects finishes. Synchronously emitting this event
 602                // ensures that the collab panel will observe this creation before the frame complete
 603                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 604            });
 605            Ok(())
 606        })
 607    }
 608
 609    pub fn respond_to_channel_invite(
 610        &mut self,
 611        channel_id: ChannelId,
 612        accept: bool,
 613    ) -> impl Future<Output = Result<()>> {
 614        let client = self.client.clone();
 615        async move {
 616            client
 617                .request(proto::RespondToChannelInvite { channel_id, accept })
 618                .await?;
 619            Ok(())
 620        }
 621    }
 622
 623    pub fn get_channel_member_details(
 624        &self,
 625        channel_id: ChannelId,
 626        cx: &mut ModelContext<Self>,
 627    ) -> Task<Result<Vec<ChannelMembership>>> {
 628        let client = self.client.clone();
 629        let user_store = self.user_store.downgrade();
 630        cx.spawn(|_, mut cx| async move {
 631            let response = client
 632                .request(proto::GetChannelMembers { channel_id })
 633                .await?;
 634
 635            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 636            let user_store = user_store
 637                .upgrade(&cx)
 638                .ok_or_else(|| anyhow!("user store dropped"))?;
 639            let users = user_store
 640                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
 641                .await?;
 642
 643            Ok(users
 644                .into_iter()
 645                .zip(response.members)
 646                .filter_map(|(user, member)| {
 647                    Some(ChannelMembership {
 648                        user,
 649                        admin: member.admin,
 650                        kind: proto::channel_member::Kind::from_i32(member.kind)?,
 651                    })
 652                })
 653                .collect())
 654        })
 655    }
 656
 657    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 658        let client = self.client.clone();
 659        async move {
 660            client.request(proto::DeleteChannel { channel_id }).await?;
 661            Ok(())
 662        }
 663    }
 664
 665    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 666        false
 667    }
 668
 669    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 670        self.outgoing_invites.contains(&(channel_id, user_id))
 671    }
 672
 673    async fn handle_update_channels(
 674        this: ModelHandle<Self>,
 675        message: TypedEnvelope<proto::UpdateChannels>,
 676        _: Arc<Client>,
 677        mut cx: AsyncAppContext,
 678    ) -> Result<()> {
 679        this.update(&mut cx, |this, _| {
 680            this.update_channels_tx
 681                .unbounded_send(message.payload)
 682                .unwrap();
 683        });
 684        Ok(())
 685    }
 686
 687    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 688        self.disconnect_channel_buffers_task.take();
 689
 690        for chat in self.opened_chats.values() {
 691            if let OpenedModelHandle::Open(chat) = chat {
 692                if let Some(chat) = chat.upgrade(cx) {
 693                    chat.update(cx, |chat, cx| {
 694                        chat.rejoin(cx);
 695                    });
 696                }
 697            }
 698        }
 699
 700        let mut buffer_versions = Vec::new();
 701        for buffer in self.opened_buffers.values() {
 702            if let OpenedModelHandle::Open(buffer) = buffer {
 703                if let Some(buffer) = buffer.upgrade(cx) {
 704                    let channel_buffer = buffer.read(cx);
 705                    let buffer = channel_buffer.buffer().read(cx);
 706                    buffer_versions.push(proto::ChannelBufferVersion {
 707                        channel_id: channel_buffer.channel().id,
 708                        epoch: channel_buffer.epoch(),
 709                        version: language::proto::serialize_version(&buffer.version()),
 710                    });
 711                }
 712            }
 713        }
 714
 715        if buffer_versions.is_empty() {
 716            return Task::ready(Ok(()));
 717        }
 718
 719        let response = self.client.request(proto::RejoinChannelBuffers {
 720            buffers: buffer_versions,
 721        });
 722
 723        cx.spawn(|this, mut cx| async move {
 724            let mut response = response.await?;
 725
 726            this.update(&mut cx, |this, cx| {
 727                this.opened_buffers.retain(|_, buffer| match buffer {
 728                    OpenedModelHandle::Open(channel_buffer) => {
 729                        let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
 730                            return false;
 731                        };
 732
 733                        channel_buffer.update(cx, |channel_buffer, cx| {
 734                            let channel_id = channel_buffer.channel().id;
 735                            if let Some(remote_buffer) = response
 736                                .buffers
 737                                .iter_mut()
 738                                .find(|buffer| buffer.channel_id == channel_id)
 739                            {
 740                                let channel_id = channel_buffer.channel().id;
 741                                let remote_version =
 742                                    language::proto::deserialize_version(&remote_buffer.version);
 743
 744                                channel_buffer.replace_collaborators(
 745                                    mem::take(&mut remote_buffer.collaborators),
 746                                    cx,
 747                                );
 748
 749                                let operations = channel_buffer
 750                                    .buffer()
 751                                    .update(cx, |buffer, cx| {
 752                                        let outgoing_operations =
 753                                            buffer.serialize_ops(Some(remote_version), cx);
 754                                        let incoming_operations =
 755                                            mem::take(&mut remote_buffer.operations)
 756                                                .into_iter()
 757                                                .map(language::proto::deserialize_operation)
 758                                                .collect::<Result<Vec<_>>>()?;
 759                                        buffer.apply_ops(incoming_operations, cx)?;
 760                                        anyhow::Ok(outgoing_operations)
 761                                    })
 762                                    .log_err();
 763
 764                                if let Some(operations) = operations {
 765                                    let client = this.client.clone();
 766                                    cx.background()
 767                                        .spawn(async move {
 768                                            let operations = operations.await;
 769                                            for chunk in
 770                                                language::proto::split_operations(operations)
 771                                            {
 772                                                client
 773                                                    .send(proto::UpdateChannelBuffer {
 774                                                        channel_id,
 775                                                        operations: chunk,
 776                                                    })
 777                                                    .ok();
 778                                            }
 779                                        })
 780                                        .detach();
 781                                    return true;
 782                                }
 783                            }
 784
 785                            channel_buffer.disconnect(cx);
 786                            false
 787                        })
 788                    }
 789                    OpenedModelHandle::Loading(_) => true,
 790                });
 791            });
 792            anyhow::Ok(())
 793        })
 794    }
 795
 796    fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
 797        self.channel_index.clear();
 798        self.channel_invitations.clear();
 799        self.channel_participants.clear();
 800        self.channels_with_admin_privileges.clear();
 801        self.channel_index.clear();
 802        self.outgoing_invites.clear();
 803        cx.notify();
 804
 805        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 806            cx.spawn_weak(|this, mut cx| async move {
 807                cx.background().timer(RECONNECT_TIMEOUT).await;
 808                if let Some(this) = this.upgrade(&cx) {
 809                    this.update(&mut cx, |this, cx| {
 810                        for (_, buffer) in this.opened_buffers.drain() {
 811                            if let OpenedModelHandle::Open(buffer) = buffer {
 812                                if let Some(buffer) = buffer.upgrade(cx) {
 813                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 814                                }
 815                            }
 816                        }
 817                    });
 818                }
 819            })
 820        });
 821    }
 822
 823    pub(crate) fn update_channels(
 824        &mut self,
 825        payload: proto::UpdateChannels,
 826        cx: &mut ModelContext<ChannelStore>,
 827    ) -> Option<Task<Result<()>>> {
 828        if !payload.remove_channel_invitations.is_empty() {
 829            self.channel_invitations
 830                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 831        }
 832        for channel in payload.channel_invitations {
 833            match self
 834                .channel_invitations
 835                .binary_search_by_key(&channel.id, |c| c.id)
 836            {
 837                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
 838                Err(ix) => self.channel_invitations.insert(
 839                    ix,
 840                    Arc::new(Channel {
 841                        id: channel.id,
 842                        name: channel.name,
 843                        unseen_note_version: None,
 844                        unseen_message_id: None,
 845                    }),
 846                ),
 847            }
 848        }
 849
 850        let channels_changed = !payload.channels.is_empty()
 851            || !payload.delete_channels.is_empty()
 852            || !payload.insert_edge.is_empty()
 853            || !payload.delete_edge.is_empty()
 854            || !payload.unseen_channel_messages.is_empty()
 855            || !payload.unseen_channel_buffer_changes.is_empty();
 856
 857        if channels_changed {
 858            if !payload.delete_channels.is_empty() {
 859                self.channel_index.delete_channels(&payload.delete_channels);
 860                self.channel_participants
 861                    .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
 862                self.channels_with_admin_privileges
 863                    .retain(|channel_id| !payload.delete_channels.contains(channel_id));
 864
 865                for channel_id in &payload.delete_channels {
 866                    let channel_id = *channel_id;
 867                    if let Some(OpenedModelHandle::Open(buffer)) =
 868                        self.opened_buffers.remove(&channel_id)
 869                    {
 870                        if let Some(buffer) = buffer.upgrade(cx) {
 871                            buffer.update(cx, ChannelBuffer::disconnect);
 872                        }
 873                    }
 874                }
 875            }
 876
 877            let mut index = self.channel_index.bulk_insert();
 878            for channel in payload.channels {
 879                index.insert(channel)
 880            }
 881
 882            for unseen_buffer_change in payload.unseen_channel_buffer_changes {
 883                let version = language::proto::deserialize_version(&unseen_buffer_change.version);
 884                index.note_changed(
 885                    unseen_buffer_change.channel_id,
 886                    unseen_buffer_change.epoch,
 887                    &version,
 888                );
 889            }
 890
 891            for unseen_channel_message in payload.unseen_channel_messages {
 892                index.new_messages(
 893                    unseen_channel_message.channel_id,
 894                    unseen_channel_message.message_id,
 895                );
 896            }
 897
 898            for edge in payload.insert_edge {
 899                index.insert_edge(edge.channel_id, edge.parent_id);
 900            }
 901
 902            for edge in payload.delete_edge {
 903                index.delete_edge(edge.parent_id, edge.channel_id);
 904            }
 905        }
 906
 907        for permission in payload.channel_permissions {
 908            if permission.is_admin {
 909                self.channels_with_admin_privileges
 910                    .insert(permission.channel_id);
 911            } else {
 912                self.channels_with_admin_privileges
 913                    .remove(&permission.channel_id);
 914            }
 915        }
 916
 917        cx.notify();
 918        if payload.channel_participants.is_empty() {
 919            return None;
 920        }
 921
 922        let mut all_user_ids = Vec::new();
 923        let channel_participants = payload.channel_participants;
 924        for entry in &channel_participants {
 925            for user_id in entry.participant_user_ids.iter() {
 926                if let Err(ix) = all_user_ids.binary_search(user_id) {
 927                    all_user_ids.insert(ix, *user_id);
 928                }
 929            }
 930        }
 931
 932        let users = self
 933            .user_store
 934            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
 935        Some(cx.spawn(|this, mut cx| async move {
 936            let users = users.await?;
 937
 938            this.update(&mut cx, |this, cx| {
 939                for entry in &channel_participants {
 940                    let mut participants: Vec<_> = entry
 941                        .participant_user_ids
 942                        .iter()
 943                        .filter_map(|user_id| {
 944                            users
 945                                .binary_search_by_key(&user_id, |user| &user.id)
 946                                .ok()
 947                                .map(|ix| users[ix].clone())
 948                        })
 949                        .collect();
 950
 951                    participants.sort_by_key(|u| u.id);
 952
 953                    this.channel_participants
 954                        .insert(entry.channel_id, participants);
 955                }
 956
 957                cx.notify();
 958            });
 959            anyhow::Ok(())
 960        }))
 961    }
 962}
 963
 964impl Deref for ChannelPath {
 965    type Target = [ChannelId];
 966
 967    fn deref(&self) -> &Self::Target {
 968        &self.0
 969    }
 970}
 971
 972impl ChannelPath {
 973    pub fn new(path: Arc<[ChannelId]>) -> Self {
 974        debug_assert!(path.len() >= 1);
 975        Self(path)
 976    }
 977
 978    pub fn parent_id(&self) -> Option<ChannelId> {
 979        self.0.len().checked_sub(2).map(|i| self.0[i])
 980    }
 981
 982    pub fn channel_id(&self) -> ChannelId {
 983        self.0[self.0.len() - 1]
 984    }
 985}
 986
 987impl From<ChannelPath> for Cow<'static, ChannelPath> {
 988    fn from(value: ChannelPath) -> Self {
 989        Cow::Owned(value)
 990    }
 991}
 992
 993impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
 994    fn from(value: &'a ChannelPath) -> Self {
 995        Cow::Borrowed(value)
 996    }
 997}
 998
 999impl Default for ChannelPath {
1000    fn default() -> Self {
1001        ChannelPath(Arc::from([]))
1002    }
1003}