channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use db::RELEASE_CHANNEL;
   9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  10use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  11use rpc::{
  12    proto::{self, ChannelEdge, ChannelPermission},
  13    TypedEnvelope,
  14};
  15use serde_derive::{Deserialize, Serialize};
  16use std::{borrow::Cow, hash::Hash, mem, ops::Deref, sync::Arc, time::Duration};
  17use util::ResultExt;
  18
  19pub fn init(client: &Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
  20    let channel_store =
  21        cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  22    cx.set_global(channel_store);
  23}
  24
  25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  26
  27pub type ChannelId = u64;
  28
  29pub struct ChannelStore {
  30    channel_index: ChannelIndex,
  31    channel_invitations: Vec<Arc<Channel>>,
  32    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  33    channels_with_admin_privileges: HashSet<ChannelId>,
  34    outgoing_invites: HashSet<(ChannelId, UserId)>,
  35    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  36    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  37    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  38    client: Arc<Client>,
  39    user_store: ModelHandle<UserStore>,
  40    _rpc_subscription: Subscription,
  41    _watch_connection_status: Task<Option<()>>,
  42    disconnect_channel_buffers_task: Option<Task<()>>,
  43    _update_channels: Task<()>,
  44}
  45
  46pub type ChannelData = (Channel, ChannelPath);
  47
  48#[derive(Clone, Debug, PartialEq)]
  49pub struct Channel {
  50    pub id: ChannelId,
  51    pub name: String,
  52    pub unseen_note_version: Option<(u64, clock::Global)>,
  53    pub unseen_message_id: Option<u64>,
  54}
  55
  56impl Channel {
  57    pub fn link(&self) -> String {
  58        RELEASE_CHANNEL.link_prefix().to_owned()
  59            + "channel/"
  60            + &self.slug()
  61            + "-"
  62            + &self.id.to_string()
  63    }
  64
  65    pub fn slug(&self) -> String {
  66        let slug: String = self
  67            .name
  68            .chars()
  69            .map(|c| if c.is_alphanumeric() { c } else { '-' })
  70            .collect();
  71
  72        slug.trim_matches(|c| c == '-').to_string()
  73    }
  74}
  75
  76#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  77pub struct ChannelPath(Arc<[ChannelId]>);
  78
  79pub struct ChannelMembership {
  80    pub user: Arc<User>,
  81    pub kind: proto::channel_member::Kind,
  82    pub admin: bool,
  83}
  84
  85pub enum ChannelEvent {
  86    ChannelCreated(ChannelId),
  87    ChannelRenamed(ChannelId),
  88}
  89
  90impl Entity for ChannelStore {
  91    type Event = ChannelEvent;
  92}
  93
  94enum OpenedModelHandle<E: Entity> {
  95    Open(WeakModelHandle<E>),
  96    Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
  97}
  98
  99impl ChannelStore {
 100    pub fn global(cx: &AppContext) -> ModelHandle<Self> {
 101        cx.global::<ModelHandle<Self>>().clone()
 102    }
 103
 104    pub fn new(
 105        client: Arc<Client>,
 106        user_store: ModelHandle<UserStore>,
 107        cx: &mut ModelContext<Self>,
 108    ) -> Self {
 109        let rpc_subscription =
 110            client.add_message_handler(cx.handle(), Self::handle_update_channels);
 111
 112        let mut connection_status = client.status();
 113        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 114        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
 115            while let Some(status) = connection_status.next().await {
 116                let this = this.upgrade(&cx)?;
 117                match status {
 118                    client::Status::Connected { .. } => {
 119                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 120                            .await
 121                            .log_err()?;
 122                    }
 123                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 124                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx));
 125                    }
 126                    _ => {
 127                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx));
 128                    }
 129                }
 130                if status.is_connected() {
 131                } else {
 132                }
 133            }
 134            Some(())
 135        });
 136
 137        Self {
 138            channel_invitations: Vec::default(),
 139            channel_index: ChannelIndex::default(),
 140            channel_participants: Default::default(),
 141            channels_with_admin_privileges: Default::default(),
 142            outgoing_invites: Default::default(),
 143            opened_buffers: Default::default(),
 144            opened_chats: Default::default(),
 145            update_channels_tx,
 146            client,
 147            user_store,
 148            _rpc_subscription: rpc_subscription,
 149            _watch_connection_status: watch_connection_status,
 150            disconnect_channel_buffers_task: None,
 151            _update_channels: cx.spawn_weak(|this, mut cx| async move {
 152                while let Some(update_channels) = update_channels_rx.next().await {
 153                    if let Some(this) = this.upgrade(&cx) {
 154                        let update_task = this.update(&mut cx, |this, cx| {
 155                            this.update_channels(update_channels, cx)
 156                        });
 157                        if let Some(update_task) = update_task {
 158                            update_task.await.log_err();
 159                        }
 160                    }
 161                }
 162            }),
 163        }
 164    }
 165
 166    pub fn client(&self) -> Arc<Client> {
 167        self.client.clone()
 168    }
 169
 170    pub fn has_children(&self, channel_id: ChannelId) -> bool {
 171        self.channel_index.iter().any(|path| {
 172            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
 173                path.len() > ix + 1
 174            } else {
 175                false
 176            }
 177        })
 178    }
 179
 180    /// Returns the number of unique channels in the store
 181    pub fn channel_count(&self) -> usize {
 182        self.channel_index.by_id().len()
 183    }
 184
 185    /// Returns the index of a channel ID in the list of unique channels
 186    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 187        self.channel_index
 188            .by_id()
 189            .keys()
 190            .position(|id| *id == channel_id)
 191    }
 192
 193    /// Returns an iterator over all unique channels
 194    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 195        self.channel_index.by_id().values()
 196    }
 197
 198    /// Iterate over all entries in the channel DAG
 199    pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 200        self.channel_index.iter().map(move |path| {
 201            let id = path.last().unwrap();
 202            let channel = self.channel_for_id(*id).unwrap();
 203            (path.len() - 1, channel)
 204        })
 205    }
 206
 207    pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
 208        let path = self.channel_index.get(ix)?;
 209        let id = path.last().unwrap();
 210        let channel = self.channel_for_id(*id).unwrap();
 211
 212        Some((channel, path))
 213    }
 214
 215    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 216        self.channel_index.by_id().values().nth(ix)
 217    }
 218
 219    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 220        &self.channel_invitations
 221    }
 222
 223    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 224        self.channel_index.by_id().get(&channel_id)
 225    }
 226
 227    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
 228        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 229            if let OpenedModelHandle::Open(buffer) = buffer {
 230                return buffer.upgrade(cx).is_some();
 231            }
 232        }
 233        false
 234    }
 235
 236    pub fn open_channel_buffer(
 237        &mut self,
 238        channel_id: ChannelId,
 239        cx: &mut ModelContext<Self>,
 240    ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
 241        let client = self.client.clone();
 242        let user_store = self.user_store.clone();
 243        self.open_channel_resource(
 244            channel_id,
 245            |this| &mut this.opened_buffers,
 246            |channel, cx| ChannelBuffer::new(channel, client, user_store, cx),
 247            cx,
 248        )
 249    }
 250
 251    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
 252        self.channel_index
 253            .by_id()
 254            .get(&channel_id)
 255            .map(|channel| channel.unseen_note_version.is_some())
 256    }
 257
 258    pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
 259        self.channel_index
 260            .by_id()
 261            .get(&channel_id)
 262            .map(|channel| channel.unseen_message_id.is_some())
 263    }
 264
 265    pub fn notes_changed(
 266        &mut self,
 267        channel_id: ChannelId,
 268        epoch: u64,
 269        version: &clock::Global,
 270        cx: &mut ModelContext<Self>,
 271    ) {
 272        self.channel_index.note_changed(channel_id, epoch, version);
 273        cx.notify();
 274    }
 275
 276    pub fn new_message(
 277        &mut self,
 278        channel_id: ChannelId,
 279        message_id: u64,
 280        cx: &mut ModelContext<Self>,
 281    ) {
 282        self.channel_index.new_message(channel_id, message_id);
 283        cx.notify();
 284    }
 285
 286    pub fn acknowledge_message_id(
 287        &mut self,
 288        channel_id: ChannelId,
 289        message_id: u64,
 290        cx: &mut ModelContext<Self>,
 291    ) {
 292        self.channel_index
 293            .acknowledge_message_id(channel_id, message_id);
 294        cx.notify();
 295    }
 296
 297    pub fn acknowledge_notes_version(
 298        &mut self,
 299        channel_id: ChannelId,
 300        epoch: u64,
 301        version: &clock::Global,
 302        cx: &mut ModelContext<Self>,
 303    ) {
 304        self.channel_index
 305            .acknowledge_note_version(channel_id, epoch, version);
 306        cx.notify();
 307    }
 308
 309    pub fn open_channel_chat(
 310        &mut self,
 311        channel_id: ChannelId,
 312        cx: &mut ModelContext<Self>,
 313    ) -> Task<Result<ModelHandle<ChannelChat>>> {
 314        let client = self.client.clone();
 315        let user_store = self.user_store.clone();
 316        let this = cx.handle();
 317        self.open_channel_resource(
 318            channel_id,
 319            |this| &mut this.opened_chats,
 320            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 321            cx,
 322        )
 323    }
 324
 325    /// Asynchronously open a given resource associated with a channel.
 326    ///
 327    /// Make sure that the resource is only opened once, even if this method
 328    /// is called multiple times with the same channel id while the first task
 329    /// is still running.
 330    fn open_channel_resource<T: Entity, F, Fut>(
 331        &mut self,
 332        channel_id: ChannelId,
 333        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 334        load: F,
 335        cx: &mut ModelContext<Self>,
 336    ) -> Task<Result<ModelHandle<T>>>
 337    where
 338        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 339        Fut: Future<Output = Result<ModelHandle<T>>>,
 340    {
 341        let task = loop {
 342            match get_map(self).entry(channel_id) {
 343                hash_map::Entry::Occupied(e) => match e.get() {
 344                    OpenedModelHandle::Open(model) => {
 345                        if let Some(model) = model.upgrade(cx) {
 346                            break Task::ready(Ok(model)).shared();
 347                        } else {
 348                            get_map(self).remove(&channel_id);
 349                            continue;
 350                        }
 351                    }
 352                    OpenedModelHandle::Loading(task) => {
 353                        break task.clone();
 354                    }
 355                },
 356                hash_map::Entry::Vacant(e) => {
 357                    let task = cx
 358                        .spawn(|this, cx| async move {
 359                            let channel = this.read_with(&cx, |this, _| {
 360                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 361                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 362                                })
 363                            })?;
 364
 365                            load(channel, cx).await.map_err(Arc::new)
 366                        })
 367                        .shared();
 368
 369                    e.insert(OpenedModelHandle::Loading(task.clone()));
 370                    cx.spawn({
 371                        let task = task.clone();
 372                        |this, mut cx| async move {
 373                            let result = task.await;
 374                            this.update(&mut cx, |this, _| match result {
 375                                Ok(model) => {
 376                                    get_map(this).insert(
 377                                        channel_id,
 378                                        OpenedModelHandle::Open(model.downgrade()),
 379                                    );
 380                                }
 381                                Err(_) => {
 382                                    get_map(this).remove(&channel_id);
 383                                }
 384                            });
 385                        }
 386                    })
 387                    .detach();
 388                    break task;
 389                }
 390            }
 391        };
 392        cx.foreground()
 393            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 394    }
 395
 396    pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
 397        self.channel_index.iter().any(|path| {
 398            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
 399                path[..=ix]
 400                    .iter()
 401                    .any(|id| self.channels_with_admin_privileges.contains(id))
 402            } else {
 403                false
 404            }
 405        })
 406    }
 407
 408    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 409        self.channel_participants
 410            .get(&channel_id)
 411            .map_or(&[], |v| v.as_slice())
 412    }
 413
 414    pub fn create_channel(
 415        &self,
 416        name: &str,
 417        parent_id: Option<ChannelId>,
 418        cx: &mut ModelContext<Self>,
 419    ) -> Task<Result<ChannelId>> {
 420        let client = self.client.clone();
 421        let name = name.trim_start_matches("#").to_owned();
 422        cx.spawn(|this, mut cx| async move {
 423            let response = client
 424                .request(proto::CreateChannel { name, parent_id })
 425                .await?;
 426
 427            let channel = response
 428                .channel
 429                .ok_or_else(|| anyhow!("missing channel in response"))?;
 430            let channel_id = channel.id;
 431
 432            let parent_edge = if let Some(parent_id) = parent_id {
 433                vec![ChannelEdge {
 434                    channel_id: channel.id,
 435                    parent_id,
 436                }]
 437            } else {
 438                vec![]
 439            };
 440
 441            this.update(&mut cx, |this, cx| {
 442                let task = this.update_channels(
 443                    proto::UpdateChannels {
 444                        channels: vec![channel],
 445                        insert_edge: parent_edge,
 446                        channel_permissions: vec![ChannelPermission {
 447                            channel_id,
 448                            is_admin: true,
 449                        }],
 450                        ..Default::default()
 451                    },
 452                    cx,
 453                );
 454                assert!(task.is_none());
 455
 456                // This event is emitted because the collab panel wants to clear the pending edit state
 457                // before this frame is rendered. But we can't guarantee that the collab panel's future
 458                // will resolve before this flush_effects finishes. Synchronously emitting this event
 459                // ensures that the collab panel will observe this creation before the frame completes
 460                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 461            });
 462
 463            Ok(channel_id)
 464        })
 465    }
 466
 467    pub fn link_channel(
 468        &mut self,
 469        channel_id: ChannelId,
 470        to: ChannelId,
 471        cx: &mut ModelContext<Self>,
 472    ) -> Task<Result<()>> {
 473        let client = self.client.clone();
 474        cx.spawn(|_, _| async move {
 475            let _ = client
 476                .request(proto::LinkChannel { channel_id, to })
 477                .await?;
 478
 479            Ok(())
 480        })
 481    }
 482
 483    pub fn unlink_channel(
 484        &mut self,
 485        channel_id: ChannelId,
 486        from: ChannelId,
 487        cx: &mut ModelContext<Self>,
 488    ) -> Task<Result<()>> {
 489        let client = self.client.clone();
 490        cx.spawn(|_, _| async move {
 491            let _ = client
 492                .request(proto::UnlinkChannel { channel_id, from })
 493                .await?;
 494
 495            Ok(())
 496        })
 497    }
 498
 499    pub fn move_channel(
 500        &mut self,
 501        channel_id: ChannelId,
 502        from: ChannelId,
 503        to: ChannelId,
 504        cx: &mut ModelContext<Self>,
 505    ) -> Task<Result<()>> {
 506        let client = self.client.clone();
 507        cx.spawn(|_, _| async move {
 508            let _ = client
 509                .request(proto::MoveChannel {
 510                    channel_id,
 511                    from,
 512                    to,
 513                })
 514                .await?;
 515
 516            Ok(())
 517        })
 518    }
 519
 520    pub fn invite_member(
 521        &mut self,
 522        channel_id: ChannelId,
 523        user_id: UserId,
 524        admin: bool,
 525        cx: &mut ModelContext<Self>,
 526    ) -> Task<Result<()>> {
 527        if !self.outgoing_invites.insert((channel_id, user_id)) {
 528            return Task::ready(Err(anyhow!("invite request already in progress")));
 529        }
 530
 531        cx.notify();
 532        let client = self.client.clone();
 533        cx.spawn(|this, mut cx| async move {
 534            let result = client
 535                .request(proto::InviteChannelMember {
 536                    channel_id,
 537                    user_id,
 538                    admin,
 539                })
 540                .await;
 541
 542            this.update(&mut cx, |this, cx| {
 543                this.outgoing_invites.remove(&(channel_id, user_id));
 544                cx.notify();
 545            });
 546
 547            result?;
 548
 549            Ok(())
 550        })
 551    }
 552
 553    pub fn remove_member(
 554        &mut self,
 555        channel_id: ChannelId,
 556        user_id: u64,
 557        cx: &mut ModelContext<Self>,
 558    ) -> Task<Result<()>> {
 559        if !self.outgoing_invites.insert((channel_id, user_id)) {
 560            return Task::ready(Err(anyhow!("invite request already in progress")));
 561        }
 562
 563        cx.notify();
 564        let client = self.client.clone();
 565        cx.spawn(|this, mut cx| async move {
 566            let result = client
 567                .request(proto::RemoveChannelMember {
 568                    channel_id,
 569                    user_id,
 570                })
 571                .await;
 572
 573            this.update(&mut cx, |this, cx| {
 574                this.outgoing_invites.remove(&(channel_id, user_id));
 575                cx.notify();
 576            });
 577            result?;
 578            Ok(())
 579        })
 580    }
 581
 582    pub fn set_member_admin(
 583        &mut self,
 584        channel_id: ChannelId,
 585        user_id: UserId,
 586        admin: bool,
 587        cx: &mut ModelContext<Self>,
 588    ) -> Task<Result<()>> {
 589        if !self.outgoing_invites.insert((channel_id, user_id)) {
 590            return Task::ready(Err(anyhow!("member request already in progress")));
 591        }
 592
 593        cx.notify();
 594        let client = self.client.clone();
 595        cx.spawn(|this, mut cx| async move {
 596            let result = client
 597                .request(proto::SetChannelMemberAdmin {
 598                    channel_id,
 599                    user_id,
 600                    admin,
 601                })
 602                .await;
 603
 604            this.update(&mut cx, |this, cx| {
 605                this.outgoing_invites.remove(&(channel_id, user_id));
 606                cx.notify();
 607            });
 608
 609            result?;
 610            Ok(())
 611        })
 612    }
 613
 614    pub fn rename(
 615        &mut self,
 616        channel_id: ChannelId,
 617        new_name: &str,
 618        cx: &mut ModelContext<Self>,
 619    ) -> Task<Result<()>> {
 620        let client = self.client.clone();
 621        let name = new_name.to_string();
 622        cx.spawn(|this, mut cx| async move {
 623            let channel = client
 624                .request(proto::RenameChannel { channel_id, name })
 625                .await?
 626                .channel
 627                .ok_or_else(|| anyhow!("missing channel in response"))?;
 628            this.update(&mut cx, |this, cx| {
 629                let task = this.update_channels(
 630                    proto::UpdateChannels {
 631                        channels: vec![channel],
 632                        ..Default::default()
 633                    },
 634                    cx,
 635                );
 636                assert!(task.is_none());
 637
 638                // This event is emitted because the collab panel wants to clear the pending edit state
 639                // before this frame is rendered. But we can't guarantee that the collab panel's future
 640                // will resolve before this flush_effects finishes. Synchronously emitting this event
 641                // ensures that the collab panel will observe this creation before the frame complete
 642                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 643            });
 644            Ok(())
 645        })
 646    }
 647
 648    pub fn respond_to_channel_invite(
 649        &mut self,
 650        channel_id: ChannelId,
 651        accept: bool,
 652    ) -> impl Future<Output = Result<()>> {
 653        let client = self.client.clone();
 654        async move {
 655            client
 656                .request(proto::RespondToChannelInvite { channel_id, accept })
 657                .await?;
 658            Ok(())
 659        }
 660    }
 661
 662    pub fn get_channel_member_details(
 663        &self,
 664        channel_id: ChannelId,
 665        cx: &mut ModelContext<Self>,
 666    ) -> Task<Result<Vec<ChannelMembership>>> {
 667        let client = self.client.clone();
 668        let user_store = self.user_store.downgrade();
 669        cx.spawn(|_, mut cx| async move {
 670            let response = client
 671                .request(proto::GetChannelMembers { channel_id })
 672                .await?;
 673
 674            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 675            let user_store = user_store
 676                .upgrade(&cx)
 677                .ok_or_else(|| anyhow!("user store dropped"))?;
 678            let users = user_store
 679                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
 680                .await?;
 681
 682            Ok(users
 683                .into_iter()
 684                .zip(response.members)
 685                .filter_map(|(user, member)| {
 686                    Some(ChannelMembership {
 687                        user,
 688                        admin: member.admin,
 689                        kind: proto::channel_member::Kind::from_i32(member.kind)?,
 690                    })
 691                })
 692                .collect())
 693        })
 694    }
 695
 696    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 697        let client = self.client.clone();
 698        async move {
 699            client.request(proto::DeleteChannel { channel_id }).await?;
 700            Ok(())
 701        }
 702    }
 703
 704    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 705        false
 706    }
 707
 708    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 709        self.outgoing_invites.contains(&(channel_id, user_id))
 710    }
 711
 712    async fn handle_update_channels(
 713        this: ModelHandle<Self>,
 714        message: TypedEnvelope<proto::UpdateChannels>,
 715        _: Arc<Client>,
 716        mut cx: AsyncAppContext,
 717    ) -> Result<()> {
 718        this.update(&mut cx, |this, _| {
 719            this.update_channels_tx
 720                .unbounded_send(message.payload)
 721                .unwrap();
 722        });
 723        Ok(())
 724    }
 725
 726    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 727        self.disconnect_channel_buffers_task.take();
 728
 729        for chat in self.opened_chats.values() {
 730            if let OpenedModelHandle::Open(chat) = chat {
 731                if let Some(chat) = chat.upgrade(cx) {
 732                    chat.update(cx, |chat, cx| {
 733                        chat.rejoin(cx);
 734                    });
 735                }
 736            }
 737        }
 738
 739        let mut buffer_versions = Vec::new();
 740        for buffer in self.opened_buffers.values() {
 741            if let OpenedModelHandle::Open(buffer) = buffer {
 742                if let Some(buffer) = buffer.upgrade(cx) {
 743                    let channel_buffer = buffer.read(cx);
 744                    let buffer = channel_buffer.buffer().read(cx);
 745                    buffer_versions.push(proto::ChannelBufferVersion {
 746                        channel_id: channel_buffer.channel().id,
 747                        epoch: channel_buffer.epoch(),
 748                        version: language::proto::serialize_version(&buffer.version()),
 749                    });
 750                }
 751            }
 752        }
 753
 754        if buffer_versions.is_empty() {
 755            return Task::ready(Ok(()));
 756        }
 757
 758        let response = self.client.request(proto::RejoinChannelBuffers {
 759            buffers: buffer_versions,
 760        });
 761
 762        cx.spawn(|this, mut cx| async move {
 763            let mut response = response.await?;
 764
 765            this.update(&mut cx, |this, cx| {
 766                this.opened_buffers.retain(|_, buffer| match buffer {
 767                    OpenedModelHandle::Open(channel_buffer) => {
 768                        let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
 769                            return false;
 770                        };
 771
 772                        channel_buffer.update(cx, |channel_buffer, cx| {
 773                            let channel_id = channel_buffer.channel().id;
 774                            if let Some(remote_buffer) = response
 775                                .buffers
 776                                .iter_mut()
 777                                .find(|buffer| buffer.channel_id == channel_id)
 778                            {
 779                                let channel_id = channel_buffer.channel().id;
 780                                let remote_version =
 781                                    language::proto::deserialize_version(&remote_buffer.version);
 782
 783                                channel_buffer.replace_collaborators(
 784                                    mem::take(&mut remote_buffer.collaborators),
 785                                    cx,
 786                                );
 787
 788                                let operations = channel_buffer
 789                                    .buffer()
 790                                    .update(cx, |buffer, cx| {
 791                                        let outgoing_operations =
 792                                            buffer.serialize_ops(Some(remote_version), cx);
 793                                        let incoming_operations =
 794                                            mem::take(&mut remote_buffer.operations)
 795                                                .into_iter()
 796                                                .map(language::proto::deserialize_operation)
 797                                                .collect::<Result<Vec<_>>>()?;
 798                                        buffer.apply_ops(incoming_operations, cx)?;
 799                                        anyhow::Ok(outgoing_operations)
 800                                    })
 801                                    .log_err();
 802
 803                                if let Some(operations) = operations {
 804                                    let client = this.client.clone();
 805                                    cx.background()
 806                                        .spawn(async move {
 807                                            let operations = operations.await;
 808                                            for chunk in
 809                                                language::proto::split_operations(operations)
 810                                            {
 811                                                client
 812                                                    .send(proto::UpdateChannelBuffer {
 813                                                        channel_id,
 814                                                        operations: chunk,
 815                                                    })
 816                                                    .ok();
 817                                            }
 818                                        })
 819                                        .detach();
 820                                    return true;
 821                                }
 822                            }
 823
 824                            channel_buffer.disconnect(cx);
 825                            false
 826                        })
 827                    }
 828                    OpenedModelHandle::Loading(_) => true,
 829                });
 830            });
 831            anyhow::Ok(())
 832        })
 833    }
 834
 835    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
 836        self.channel_index.clear();
 837        self.channel_invitations.clear();
 838        self.channel_participants.clear();
 839        self.channels_with_admin_privileges.clear();
 840        self.channel_index.clear();
 841        self.outgoing_invites.clear();
 842        cx.notify();
 843
 844        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 845            cx.spawn_weak(|this, mut cx| async move {
 846                if wait_for_reconnect {
 847                    cx.background().timer(RECONNECT_TIMEOUT).await;
 848                }
 849
 850                if let Some(this) = this.upgrade(&cx) {
 851                    this.update(&mut cx, |this, cx| {
 852                        for (_, buffer) in this.opened_buffers.drain() {
 853                            if let OpenedModelHandle::Open(buffer) = buffer {
 854                                if let Some(buffer) = buffer.upgrade(cx) {
 855                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 856                                }
 857                            }
 858                        }
 859                    });
 860                }
 861            })
 862        });
 863    }
 864
 865    pub(crate) fn update_channels(
 866        &mut self,
 867        payload: proto::UpdateChannels,
 868        cx: &mut ModelContext<ChannelStore>,
 869    ) -> Option<Task<Result<()>>> {
 870        if !payload.remove_channel_invitations.is_empty() {
 871            self.channel_invitations
 872                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 873        }
 874        for channel in payload.channel_invitations {
 875            match self
 876                .channel_invitations
 877                .binary_search_by_key(&channel.id, |c| c.id)
 878            {
 879                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
 880                Err(ix) => self.channel_invitations.insert(
 881                    ix,
 882                    Arc::new(Channel {
 883                        id: channel.id,
 884                        name: channel.name,
 885                        unseen_note_version: None,
 886                        unseen_message_id: None,
 887                    }),
 888                ),
 889            }
 890        }
 891
 892        let channels_changed = !payload.channels.is_empty()
 893            || !payload.delete_channels.is_empty()
 894            || !payload.insert_edge.is_empty()
 895            || !payload.delete_edge.is_empty()
 896            || !payload.unseen_channel_messages.is_empty()
 897            || !payload.unseen_channel_buffer_changes.is_empty();
 898
 899        if channels_changed {
 900            if !payload.delete_channels.is_empty() {
 901                self.channel_index.delete_channels(&payload.delete_channels);
 902                self.channel_participants
 903                    .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
 904                self.channels_with_admin_privileges
 905                    .retain(|channel_id| !payload.delete_channels.contains(channel_id));
 906
 907                for channel_id in &payload.delete_channels {
 908                    let channel_id = *channel_id;
 909                    if let Some(OpenedModelHandle::Open(buffer)) =
 910                        self.opened_buffers.remove(&channel_id)
 911                    {
 912                        if let Some(buffer) = buffer.upgrade(cx) {
 913                            buffer.update(cx, ChannelBuffer::disconnect);
 914                        }
 915                    }
 916                }
 917            }
 918
 919            let mut index = self.channel_index.bulk_insert();
 920            for channel in payload.channels {
 921                index.insert(channel)
 922            }
 923
 924            for unseen_buffer_change in payload.unseen_channel_buffer_changes {
 925                let version = language::proto::deserialize_version(&unseen_buffer_change.version);
 926                index.note_changed(
 927                    unseen_buffer_change.channel_id,
 928                    unseen_buffer_change.epoch,
 929                    &version,
 930                );
 931            }
 932
 933            for unseen_channel_message in payload.unseen_channel_messages {
 934                index.new_messages(
 935                    unseen_channel_message.channel_id,
 936                    unseen_channel_message.message_id,
 937                );
 938            }
 939
 940            for edge in payload.insert_edge {
 941                index.insert_edge(edge.channel_id, edge.parent_id);
 942            }
 943
 944            for edge in payload.delete_edge {
 945                index.delete_edge(edge.parent_id, edge.channel_id);
 946            }
 947        }
 948
 949        for permission in payload.channel_permissions {
 950            if permission.is_admin {
 951                self.channels_with_admin_privileges
 952                    .insert(permission.channel_id);
 953            } else {
 954                self.channels_with_admin_privileges
 955                    .remove(&permission.channel_id);
 956            }
 957        }
 958
 959        cx.notify();
 960        if payload.channel_participants.is_empty() {
 961            return None;
 962        }
 963
 964        let mut all_user_ids = Vec::new();
 965        let channel_participants = payload.channel_participants;
 966        for entry in &channel_participants {
 967            for user_id in entry.participant_user_ids.iter() {
 968                if let Err(ix) = all_user_ids.binary_search(user_id) {
 969                    all_user_ids.insert(ix, *user_id);
 970                }
 971            }
 972        }
 973
 974        let users = self
 975            .user_store
 976            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
 977        Some(cx.spawn(|this, mut cx| async move {
 978            let users = users.await?;
 979
 980            this.update(&mut cx, |this, cx| {
 981                for entry in &channel_participants {
 982                    let mut participants: Vec<_> = entry
 983                        .participant_user_ids
 984                        .iter()
 985                        .filter_map(|user_id| {
 986                            users
 987                                .binary_search_by_key(&user_id, |user| &user.id)
 988                                .ok()
 989                                .map(|ix| users[ix].clone())
 990                        })
 991                        .collect();
 992
 993                    participants.sort_by_key(|u| u.id);
 994
 995                    this.channel_participants
 996                        .insert(entry.channel_id, participants);
 997                }
 998
 999                cx.notify();
1000            });
1001            anyhow::Ok(())
1002        }))
1003    }
1004}
1005
1006impl Deref for ChannelPath {
1007    type Target = [ChannelId];
1008
1009    fn deref(&self) -> &Self::Target {
1010        &self.0
1011    }
1012}
1013
1014impl ChannelPath {
1015    pub fn new(path: Arc<[ChannelId]>) -> Self {
1016        debug_assert!(path.len() >= 1);
1017        Self(path)
1018    }
1019
1020    pub fn parent_id(&self) -> Option<ChannelId> {
1021        self.0.len().checked_sub(2).map(|i| self.0[i])
1022    }
1023
1024    pub fn channel_id(&self) -> ChannelId {
1025        self.0[self.0.len() - 1]
1026    }
1027}
1028
1029impl From<ChannelPath> for Cow<'static, ChannelPath> {
1030    fn from(value: ChannelPath) -> Self {
1031        Cow::Owned(value)
1032    }
1033}
1034
1035impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
1036    fn from(value: &'a ChannelPath) -> Self {
1037        Cow::Borrowed(value)
1038    }
1039}
1040
1041impl Default for ChannelPath {
1042    fn default() -> Self {
1043        ChannelPath(Arc::from([]))
1044    }
1045}