channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use db::RELEASE_CHANNEL;
   9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  10use gpui::{
  11    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
  12};
  13use rpc::{
  14    proto::{self, ChannelVisibility},
  15    TypedEnvelope,
  16};
  17use std::{mem, sync::Arc, time::Duration};
  18use util::{async_maybe, ResultExt};
  19
  20pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  21    let channel_store =
  22        cx.build_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  23    cx.set_global(channel_store);
  24}
  25
  26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  27
  28pub type ChannelId = u64;
  29
  30pub struct ChannelStore {
  31    pub channel_index: ChannelIndex,
  32    channel_invitations: Vec<Arc<Channel>>,
  33    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  34    outgoing_invites: HashSet<(ChannelId, UserId)>,
  35    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  36    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  37    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  38    client: Arc<Client>,
  39    user_store: Model<UserStore>,
  40    _rpc_subscription: Subscription,
  41    _watch_connection_status: Task<Option<()>>,
  42    disconnect_channel_buffers_task: Option<Task<()>>,
  43    _update_channels: Task<()>,
  44}
  45
  46#[derive(Clone, Debug, PartialEq)]
  47pub struct Channel {
  48    pub id: ChannelId,
  49    pub name: String,
  50    pub visibility: proto::ChannelVisibility,
  51    pub role: proto::ChannelRole,
  52    pub unseen_note_version: Option<(u64, clock::Global)>,
  53    pub unseen_message_id: Option<u64>,
  54    pub parent_path: Vec<u64>,
  55}
  56
  57impl Channel {
  58    pub fn link(&self) -> String {
  59        RELEASE_CHANNEL.link_prefix().to_owned()
  60            + "channel/"
  61            + &self.slug()
  62            + "-"
  63            + &self.id.to_string()
  64    }
  65
  66    pub fn slug(&self) -> String {
  67        let slug: String = self
  68            .name
  69            .chars()
  70            .map(|c| if c.is_alphanumeric() { c } else { '-' })
  71            .collect();
  72
  73        slug.trim_matches(|c| c == '-').to_string()
  74    }
  75
  76    pub fn can_edit_notes(&self) -> bool {
  77        self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin
  78    }
  79}
  80
  81pub struct ChannelMembership {
  82    pub user: Arc<User>,
  83    pub kind: proto::channel_member::Kind,
  84    pub role: proto::ChannelRole,
  85}
  86impl ChannelMembership {
  87    pub fn sort_key(&self) -> MembershipSortKey {
  88        MembershipSortKey {
  89            role_order: match self.role {
  90                proto::ChannelRole::Admin => 0,
  91                proto::ChannelRole::Member => 1,
  92                proto::ChannelRole::Banned => 2,
  93                proto::ChannelRole::Guest => 3,
  94            },
  95            kind_order: match self.kind {
  96                proto::channel_member::Kind::Member => 0,
  97                proto::channel_member::Kind::AncestorMember => 1,
  98                proto::channel_member::Kind::Invitee => 2,
  99            },
 100            username_order: self.user.github_login.as_str(),
 101        }
 102    }
 103}
 104
 105#[derive(PartialOrd, Ord, PartialEq, Eq)]
 106pub struct MembershipSortKey<'a> {
 107    role_order: u8,
 108    kind_order: u8,
 109    username_order: &'a str,
 110}
 111
 112pub enum ChannelEvent {
 113    ChannelCreated(ChannelId),
 114    ChannelRenamed(ChannelId),
 115}
 116
 117impl EventEmitter<ChannelEvent> for ChannelStore {}
 118
 119enum OpenedModelHandle<E> {
 120    Open(WeakModel<E>),
 121    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 122}
 123
 124impl ChannelStore {
 125    pub fn global(cx: &AppContext) -> Model<Self> {
 126        cx.global::<Model<Self>>().clone()
 127    }
 128
 129    pub fn new(
 130        client: Arc<Client>,
 131        user_store: Model<UserStore>,
 132        cx: &mut ModelContext<Self>,
 133    ) -> Self {
 134        let rpc_subscription =
 135            client.add_message_handler(cx.weak_model(), Self::handle_update_channels);
 136
 137        let mut connection_status = client.status();
 138        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 139        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 140            while let Some(status) = connection_status.next().await {
 141                let this = this.upgrade()?;
 142                match status {
 143                    client::Status::Connected { .. } => {
 144                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 145                            .ok()?
 146                            .await
 147                            .log_err()?;
 148                    }
 149                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 150                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 151                            .ok();
 152                    }
 153                    _ => {
 154                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 155                            .ok();
 156                    }
 157                }
 158            }
 159            Some(())
 160        });
 161
 162        Self {
 163            channel_invitations: Vec::default(),
 164            channel_index: ChannelIndex::default(),
 165            channel_participants: Default::default(),
 166            outgoing_invites: Default::default(),
 167            opened_buffers: Default::default(),
 168            opened_chats: Default::default(),
 169            update_channels_tx,
 170            client,
 171            user_store,
 172            _rpc_subscription: rpc_subscription,
 173            _watch_connection_status: watch_connection_status,
 174            disconnect_channel_buffers_task: None,
 175            _update_channels: cx.spawn(|this, mut cx| async move {
 176                async_maybe!({
 177                    while let Some(update_channels) = update_channels_rx.next().await {
 178                        if let Some(this) = this.upgrade() {
 179                            let update_task = this.update(&mut cx, |this, cx| {
 180                                this.update_channels(update_channels, cx)
 181                            })?;
 182                            if let Some(update_task) = update_task {
 183                                update_task.await.log_err();
 184                            }
 185                        }
 186                    }
 187                    anyhow::Ok(())
 188                })
 189                .await
 190                .log_err();
 191            }),
 192        }
 193    }
 194
 195    pub fn client(&self) -> Arc<Client> {
 196        self.client.clone()
 197    }
 198
 199    /// Returns the number of unique channels in the store
 200    pub fn channel_count(&self) -> usize {
 201        self.channel_index.by_id().len()
 202    }
 203
 204    /// Returns the index of a channel ID in the list of unique channels
 205    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 206        self.channel_index
 207            .by_id()
 208            .keys()
 209            .position(|id| *id == channel_id)
 210    }
 211
 212    /// Returns an iterator over all unique channels
 213    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 214        self.channel_index.by_id().values()
 215    }
 216
 217    /// Iterate over all entries in the channel DAG
 218    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 219        self.channel_index
 220            .ordered_channels()
 221            .iter()
 222            .filter_map(move |id| {
 223                let channel = self.channel_index.by_id().get(id)?;
 224                Some((channel.parent_path.len(), channel))
 225            })
 226    }
 227
 228    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 229        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 230        self.channel_index.by_id().get(channel_id)
 231    }
 232
 233    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 234        self.channel_index.by_id().values().nth(ix)
 235    }
 236
 237    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 238        self.channel_invitations
 239            .iter()
 240            .any(|channel| channel.id == channel_id)
 241    }
 242
 243    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 244        &self.channel_invitations
 245    }
 246
 247    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 248        self.channel_index.by_id().get(&channel_id)
 249    }
 250
 251    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 252        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 253            if let OpenedModelHandle::Open(buffer) = buffer {
 254                return buffer.upgrade().is_some();
 255            }
 256        }
 257        false
 258    }
 259
 260    pub fn open_channel_buffer(
 261        &mut self,
 262        channel_id: ChannelId,
 263        cx: &mut ModelContext<Self>,
 264    ) -> Task<Result<Model<ChannelBuffer>>> {
 265        let client = self.client.clone();
 266        let user_store = self.user_store.clone();
 267        let channel_store = cx.handle();
 268        self.open_channel_resource(
 269            channel_id,
 270            |this| &mut this.opened_buffers,
 271            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 272            cx,
 273        )
 274    }
 275
 276    pub fn fetch_channel_messages(
 277        &self,
 278        message_ids: Vec<u64>,
 279        cx: &mut ModelContext<Self>,
 280    ) -> Task<Result<Vec<ChannelMessage>>> {
 281        let request = if message_ids.is_empty() {
 282            None
 283        } else {
 284            Some(
 285                self.client
 286                    .request(proto::GetChannelMessagesById { message_ids }),
 287            )
 288        };
 289        cx.spawn(|this, mut cx| async move {
 290            if let Some(request) = request {
 291                let response = request.await?;
 292                let this = this
 293                    .upgrade()
 294                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 295                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 296                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 297            } else {
 298                Ok(Vec::new())
 299            }
 300        })
 301    }
 302
 303    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
 304        self.channel_index
 305            .by_id()
 306            .get(&channel_id)
 307            .map(|channel| channel.unseen_note_version.is_some())
 308    }
 309
 310    pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
 311        self.channel_index
 312            .by_id()
 313            .get(&channel_id)
 314            .map(|channel| channel.unseen_message_id.is_some())
 315    }
 316
 317    pub fn notes_changed(
 318        &mut self,
 319        channel_id: ChannelId,
 320        epoch: u64,
 321        version: &clock::Global,
 322        cx: &mut ModelContext<Self>,
 323    ) {
 324        self.channel_index.note_changed(channel_id, epoch, version);
 325        cx.notify();
 326    }
 327
 328    pub fn new_message(
 329        &mut self,
 330        channel_id: ChannelId,
 331        message_id: u64,
 332        cx: &mut ModelContext<Self>,
 333    ) {
 334        self.channel_index.new_message(channel_id, message_id);
 335        cx.notify();
 336    }
 337
 338    pub fn acknowledge_message_id(
 339        &mut self,
 340        channel_id: ChannelId,
 341        message_id: u64,
 342        cx: &mut ModelContext<Self>,
 343    ) {
 344        self.channel_index
 345            .acknowledge_message_id(channel_id, message_id);
 346        cx.notify();
 347    }
 348
 349    pub fn acknowledge_notes_version(
 350        &mut self,
 351        channel_id: ChannelId,
 352        epoch: u64,
 353        version: &clock::Global,
 354        cx: &mut ModelContext<Self>,
 355    ) {
 356        self.channel_index
 357            .acknowledge_note_version(channel_id, epoch, version);
 358        cx.notify();
 359    }
 360
 361    pub fn open_channel_chat(
 362        &mut self,
 363        channel_id: ChannelId,
 364        cx: &mut ModelContext<Self>,
 365    ) -> Task<Result<Model<ChannelChat>>> {
 366        let client = self.client.clone();
 367        let user_store = self.user_store.clone();
 368        let this = cx.handle();
 369        self.open_channel_resource(
 370            channel_id,
 371            |this| &mut this.opened_chats,
 372            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 373            cx,
 374        )
 375    }
 376
 377    /// Asynchronously open a given resource associated with a channel.
 378    ///
 379    /// Make sure that the resource is only opened once, even if this method
 380    /// is called multiple times with the same channel id while the first task
 381    /// is still running.
 382    fn open_channel_resource<T, F, Fut>(
 383        &mut self,
 384        channel_id: ChannelId,
 385        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 386        load: F,
 387        cx: &mut ModelContext<Self>,
 388    ) -> Task<Result<Model<T>>>
 389    where
 390        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 391        Fut: Future<Output = Result<Model<T>>>,
 392        T: 'static,
 393    {
 394        let task = loop {
 395            match get_map(self).entry(channel_id) {
 396                hash_map::Entry::Occupied(e) => match e.get() {
 397                    OpenedModelHandle::Open(model) => {
 398                        if let Some(model) = model.upgrade() {
 399                            break Task::ready(Ok(model)).shared();
 400                        } else {
 401                            get_map(self).remove(&channel_id);
 402                            continue;
 403                        }
 404                    }
 405                    OpenedModelHandle::Loading(task) => {
 406                        break task.clone();
 407                    }
 408                },
 409                hash_map::Entry::Vacant(e) => {
 410                    let task = cx
 411                        .spawn(move |this, mut cx| async move {
 412                            let channel = this.update(&mut cx, |this, _| {
 413                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 414                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 415                                })
 416                            })??;
 417
 418                            load(channel, cx).await.map_err(Arc::new)
 419                        })
 420                        .shared();
 421
 422                    e.insert(OpenedModelHandle::Loading(task.clone()));
 423                    cx.spawn({
 424                        let task = task.clone();
 425                        move |this, mut cx| async move {
 426                            let result = task.await;
 427                            this.update(&mut cx, |this, _| match result {
 428                                Ok(model) => {
 429                                    get_map(this).insert(
 430                                        channel_id,
 431                                        OpenedModelHandle::Open(model.downgrade()),
 432                                    );
 433                                }
 434                                Err(_) => {
 435                                    get_map(this).remove(&channel_id);
 436                                }
 437                            })
 438                            .ok();
 439                        }
 440                    })
 441                    .detach();
 442                    break task;
 443                }
 444            }
 445        };
 446        cx.background_executor()
 447            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 448    }
 449
 450    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 451        let Some(channel) = self.channel_for_id(channel_id) else {
 452            return false;
 453        };
 454        channel.role == proto::ChannelRole::Admin
 455    }
 456
 457    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 458        self.channel_participants
 459            .get(&channel_id)
 460            .map_or(&[], |v| v.as_slice())
 461    }
 462
 463    pub fn create_channel(
 464        &self,
 465        name: &str,
 466        parent_id: Option<ChannelId>,
 467        cx: &mut ModelContext<Self>,
 468    ) -> Task<Result<ChannelId>> {
 469        let client = self.client.clone();
 470        let name = name.trim_start_matches("#").to_owned();
 471        cx.spawn(move |this, mut cx| async move {
 472            let response = client
 473                .request(proto::CreateChannel { name, parent_id })
 474                .await?;
 475
 476            let channel = response
 477                .channel
 478                .ok_or_else(|| anyhow!("missing channel in response"))?;
 479            let channel_id = channel.id;
 480
 481            this.update(&mut cx, |this, cx| {
 482                let task = this.update_channels(
 483                    proto::UpdateChannels {
 484                        channels: vec![channel],
 485                        ..Default::default()
 486                    },
 487                    cx,
 488                );
 489                assert!(task.is_none());
 490
 491                // This event is emitted because the collab panel wants to clear the pending edit state
 492                // before this frame is rendered. But we can't guarantee that the collab panel's future
 493                // will resolve before this flush_effects finishes. Synchronously emitting this event
 494                // ensures that the collab panel will observe this creation before the frame completes
 495                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 496            })?;
 497
 498            Ok(channel_id)
 499        })
 500    }
 501
 502    pub fn move_channel(
 503        &mut self,
 504        channel_id: ChannelId,
 505        to: Option<ChannelId>,
 506        cx: &mut ModelContext<Self>,
 507    ) -> Task<Result<()>> {
 508        let client = self.client.clone();
 509        cx.spawn(move |_, _| async move {
 510            let _ = client
 511                .request(proto::MoveChannel { channel_id, to })
 512                .await?;
 513
 514            Ok(())
 515        })
 516    }
 517
 518    pub fn set_channel_visibility(
 519        &mut self,
 520        channel_id: ChannelId,
 521        visibility: ChannelVisibility,
 522        cx: &mut ModelContext<Self>,
 523    ) -> Task<Result<()>> {
 524        let client = self.client.clone();
 525        cx.spawn(move |_, _| async move {
 526            let _ = client
 527                .request(proto::SetChannelVisibility {
 528                    channel_id,
 529                    visibility: visibility.into(),
 530                })
 531                .await?;
 532
 533            Ok(())
 534        })
 535    }
 536
 537    pub fn invite_member(
 538        &mut self,
 539        channel_id: ChannelId,
 540        user_id: UserId,
 541        role: proto::ChannelRole,
 542        cx: &mut ModelContext<Self>,
 543    ) -> Task<Result<()>> {
 544        if !self.outgoing_invites.insert((channel_id, user_id)) {
 545            return Task::ready(Err(anyhow!("invite request already in progress")));
 546        }
 547
 548        cx.notify();
 549        let client = self.client.clone();
 550        cx.spawn(move |this, mut cx| async move {
 551            let result = client
 552                .request(proto::InviteChannelMember {
 553                    channel_id,
 554                    user_id,
 555                    role: role.into(),
 556                })
 557                .await;
 558
 559            this.update(&mut cx, |this, cx| {
 560                this.outgoing_invites.remove(&(channel_id, user_id));
 561                cx.notify();
 562            })?;
 563
 564            result?;
 565
 566            Ok(())
 567        })
 568    }
 569
 570    pub fn remove_member(
 571        &mut self,
 572        channel_id: ChannelId,
 573        user_id: u64,
 574        cx: &mut ModelContext<Self>,
 575    ) -> Task<Result<()>> {
 576        if !self.outgoing_invites.insert((channel_id, user_id)) {
 577            return Task::ready(Err(anyhow!("invite request already in progress")));
 578        }
 579
 580        cx.notify();
 581        let client = self.client.clone();
 582        cx.spawn(move |this, mut cx| async move {
 583            let result = client
 584                .request(proto::RemoveChannelMember {
 585                    channel_id,
 586                    user_id,
 587                })
 588                .await;
 589
 590            this.update(&mut cx, |this, cx| {
 591                this.outgoing_invites.remove(&(channel_id, user_id));
 592                cx.notify();
 593            })?;
 594            result?;
 595            Ok(())
 596        })
 597    }
 598
 599    pub fn set_member_role(
 600        &mut self,
 601        channel_id: ChannelId,
 602        user_id: UserId,
 603        role: proto::ChannelRole,
 604        cx: &mut ModelContext<Self>,
 605    ) -> Task<Result<()>> {
 606        if !self.outgoing_invites.insert((channel_id, user_id)) {
 607            return Task::ready(Err(anyhow!("member request already in progress")));
 608        }
 609
 610        cx.notify();
 611        let client = self.client.clone();
 612        cx.spawn(move |this, mut cx| async move {
 613            let result = client
 614                .request(proto::SetChannelMemberRole {
 615                    channel_id,
 616                    user_id,
 617                    role: role.into(),
 618                })
 619                .await;
 620
 621            this.update(&mut cx, |this, cx| {
 622                this.outgoing_invites.remove(&(channel_id, user_id));
 623                cx.notify();
 624            })?;
 625
 626            result?;
 627            Ok(())
 628        })
 629    }
 630
 631    pub fn rename(
 632        &mut self,
 633        channel_id: ChannelId,
 634        new_name: &str,
 635        cx: &mut ModelContext<Self>,
 636    ) -> Task<Result<()>> {
 637        let client = self.client.clone();
 638        let name = new_name.to_string();
 639        cx.spawn(move |this, mut cx| async move {
 640            let channel = client
 641                .request(proto::RenameChannel { channel_id, name })
 642                .await?
 643                .channel
 644                .ok_or_else(|| anyhow!("missing channel in response"))?;
 645            this.update(&mut cx, |this, cx| {
 646                let task = this.update_channels(
 647                    proto::UpdateChannels {
 648                        channels: vec![channel],
 649                        ..Default::default()
 650                    },
 651                    cx,
 652                );
 653                assert!(task.is_none());
 654
 655                // This event is emitted because the collab panel wants to clear the pending edit state
 656                // before this frame is rendered. But we can't guarantee that the collab panel's future
 657                // will resolve before this flush_effects finishes. Synchronously emitting this event
 658                // ensures that the collab panel will observe this creation before the frame complete
 659                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 660            })?;
 661            Ok(())
 662        })
 663    }
 664
 665    pub fn respond_to_channel_invite(
 666        &mut self,
 667        channel_id: ChannelId,
 668        accept: bool,
 669        cx: &mut ModelContext<Self>,
 670    ) -> Task<Result<()>> {
 671        let client = self.client.clone();
 672        cx.background_executor().spawn(async move {
 673            client
 674                .request(proto::RespondToChannelInvite { channel_id, accept })
 675                .await?;
 676            Ok(())
 677        })
 678    }
 679
 680    pub fn get_channel_member_details(
 681        &self,
 682        channel_id: ChannelId,
 683        cx: &mut ModelContext<Self>,
 684    ) -> Task<Result<Vec<ChannelMembership>>> {
 685        let client = self.client.clone();
 686        let user_store = self.user_store.downgrade();
 687        cx.spawn(move |_, mut cx| async move {
 688            let response = client
 689                .request(proto::GetChannelMembers { channel_id })
 690                .await?;
 691
 692            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 693            let user_store = user_store
 694                .upgrade()
 695                .ok_or_else(|| anyhow!("user store dropped"))?;
 696            let users = user_store
 697                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 698                .await?;
 699
 700            Ok(users
 701                .into_iter()
 702                .zip(response.members)
 703                .filter_map(|(user, member)| {
 704                    Some(ChannelMembership {
 705                        user,
 706                        role: member.role(),
 707                        kind: member.kind(),
 708                    })
 709                })
 710                .collect())
 711        })
 712    }
 713
 714    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 715        let client = self.client.clone();
 716        async move {
 717            client.request(proto::DeleteChannel { channel_id }).await?;
 718            Ok(())
 719        }
 720    }
 721
 722    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 723        false
 724    }
 725
 726    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 727        self.outgoing_invites.contains(&(channel_id, user_id))
 728    }
 729
 730    async fn handle_update_channels(
 731        this: Model<Self>,
 732        message: TypedEnvelope<proto::UpdateChannels>,
 733        _: Arc<Client>,
 734        mut cx: AsyncAppContext,
 735    ) -> Result<()> {
 736        this.update(&mut cx, |this, _| {
 737            this.update_channels_tx
 738                .unbounded_send(message.payload)
 739                .unwrap();
 740        })?;
 741        Ok(())
 742    }
 743
 744    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 745        self.channel_index.clear();
 746        self.channel_invitations.clear();
 747        self.channel_participants.clear();
 748        self.channel_index.clear();
 749        self.outgoing_invites.clear();
 750        self.disconnect_channel_buffers_task.take();
 751
 752        for chat in self.opened_chats.values() {
 753            if let OpenedModelHandle::Open(chat) = chat {
 754                if let Some(chat) = chat.upgrade() {
 755                    chat.update(cx, |chat, cx| {
 756                        chat.rejoin(cx);
 757                    });
 758                }
 759            }
 760        }
 761
 762        let mut buffer_versions = Vec::new();
 763        for buffer in self.opened_buffers.values() {
 764            if let OpenedModelHandle::Open(buffer) = buffer {
 765                if let Some(buffer) = buffer.upgrade() {
 766                    let channel_buffer = buffer.read(cx);
 767                    let buffer = channel_buffer.buffer().read(cx);
 768                    buffer_versions.push(proto::ChannelBufferVersion {
 769                        channel_id: channel_buffer.channel_id,
 770                        epoch: channel_buffer.epoch(),
 771                        version: language::proto::serialize_version(&buffer.version()),
 772                    });
 773                }
 774            }
 775        }
 776
 777        if buffer_versions.is_empty() {
 778            return Task::ready(Ok(()));
 779        }
 780
 781        let response = self.client.request(proto::RejoinChannelBuffers {
 782            buffers: buffer_versions,
 783        });
 784
 785        cx.spawn(|this, mut cx| async move {
 786            let mut response = response.await?;
 787
 788            this.update(&mut cx, |this, cx| {
 789                this.opened_buffers.retain(|_, buffer| match buffer {
 790                    OpenedModelHandle::Open(channel_buffer) => {
 791                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 792                            return false;
 793                        };
 794
 795                        channel_buffer.update(cx, |channel_buffer, cx| {
 796                            let channel_id = channel_buffer.channel_id;
 797                            if let Some(remote_buffer) = response
 798                                .buffers
 799                                .iter_mut()
 800                                .find(|buffer| buffer.channel_id == channel_id)
 801                            {
 802                                let channel_id = channel_buffer.channel_id;
 803                                let remote_version =
 804                                    language::proto::deserialize_version(&remote_buffer.version);
 805
 806                                channel_buffer.replace_collaborators(
 807                                    mem::take(&mut remote_buffer.collaborators),
 808                                    cx,
 809                                );
 810
 811                                let operations = channel_buffer
 812                                    .buffer()
 813                                    .update(cx, |buffer, cx| {
 814                                        let outgoing_operations =
 815                                            buffer.serialize_ops(Some(remote_version), cx);
 816                                        let incoming_operations =
 817                                            mem::take(&mut remote_buffer.operations)
 818                                                .into_iter()
 819                                                .map(language::proto::deserialize_operation)
 820                                                .collect::<Result<Vec<_>>>()?;
 821                                        buffer.apply_ops(incoming_operations, cx)?;
 822                                        anyhow::Ok(outgoing_operations)
 823                                    })
 824                                    .log_err();
 825
 826                                if let Some(operations) = operations {
 827                                    let client = this.client.clone();
 828                                    cx.background_executor()
 829                                        .spawn(async move {
 830                                            let operations = operations.await;
 831                                            for chunk in
 832                                                language::proto::split_operations(operations)
 833                                            {
 834                                                client
 835                                                    .send(proto::UpdateChannelBuffer {
 836                                                        channel_id,
 837                                                        operations: chunk,
 838                                                    })
 839                                                    .ok();
 840                                            }
 841                                        })
 842                                        .detach();
 843                                    return true;
 844                                }
 845                            }
 846
 847                            channel_buffer.disconnect(cx);
 848                            false
 849                        })
 850                    }
 851                    OpenedModelHandle::Loading(_) => true,
 852                });
 853            })
 854            .ok();
 855            anyhow::Ok(())
 856        })
 857    }
 858
 859    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
 860        cx.notify();
 861
 862        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 863            cx.spawn(move |this, mut cx| async move {
 864                if wait_for_reconnect {
 865                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
 866                }
 867
 868                if let Some(this) = this.upgrade() {
 869                    this.update(&mut cx, |this, cx| {
 870                        for (_, buffer) in this.opened_buffers.drain() {
 871                            if let OpenedModelHandle::Open(buffer) = buffer {
 872                                if let Some(buffer) = buffer.upgrade() {
 873                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 874                                }
 875                            }
 876                        }
 877                    })
 878                    .ok();
 879                }
 880            })
 881        });
 882    }
 883
 884    pub(crate) fn update_channels(
 885        &mut self,
 886        payload: proto::UpdateChannels,
 887        cx: &mut ModelContext<ChannelStore>,
 888    ) -> Option<Task<Result<()>>> {
 889        if !payload.remove_channel_invitations.is_empty() {
 890            self.channel_invitations
 891                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 892        }
 893        for channel in payload.channel_invitations {
 894            match self
 895                .channel_invitations
 896                .binary_search_by_key(&channel.id, |c| c.id)
 897            {
 898                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
 899                Err(ix) => self.channel_invitations.insert(
 900                    ix,
 901                    Arc::new(Channel {
 902                        id: channel.id,
 903                        visibility: channel.visibility(),
 904                        role: channel.role(),
 905                        name: channel.name,
 906                        unseen_note_version: None,
 907                        unseen_message_id: None,
 908                        parent_path: channel.parent_path,
 909                    }),
 910                ),
 911            }
 912        }
 913
 914        let channels_changed = !payload.channels.is_empty()
 915            || !payload.delete_channels.is_empty()
 916            || !payload.unseen_channel_messages.is_empty()
 917            || !payload.unseen_channel_buffer_changes.is_empty();
 918
 919        if channels_changed {
 920            if !payload.delete_channels.is_empty() {
 921                self.channel_index.delete_channels(&payload.delete_channels);
 922                self.channel_participants
 923                    .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
 924
 925                for channel_id in &payload.delete_channels {
 926                    let channel_id = *channel_id;
 927                    if payload
 928                        .channels
 929                        .iter()
 930                        .any(|channel| channel.id == channel_id)
 931                    {
 932                        continue;
 933                    }
 934                    if let Some(OpenedModelHandle::Open(buffer)) =
 935                        self.opened_buffers.remove(&channel_id)
 936                    {
 937                        if let Some(buffer) = buffer.upgrade() {
 938                            buffer.update(cx, ChannelBuffer::disconnect);
 939                        }
 940                    }
 941                }
 942            }
 943
 944            let mut index = self.channel_index.bulk_insert();
 945            for channel in payload.channels {
 946                let id = channel.id;
 947                let channel_changed = index.insert(channel);
 948
 949                if channel_changed {
 950                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
 951                        if let Some(buffer) = buffer.upgrade() {
 952                            buffer.update(cx, ChannelBuffer::channel_changed);
 953                        }
 954                    }
 955                }
 956            }
 957
 958            for unseen_buffer_change in payload.unseen_channel_buffer_changes {
 959                let version = language::proto::deserialize_version(&unseen_buffer_change.version);
 960                index.note_changed(
 961                    unseen_buffer_change.channel_id,
 962                    unseen_buffer_change.epoch,
 963                    &version,
 964                );
 965            }
 966
 967            for unseen_channel_message in payload.unseen_channel_messages {
 968                index.new_messages(
 969                    unseen_channel_message.channel_id,
 970                    unseen_channel_message.message_id,
 971                );
 972            }
 973        }
 974
 975        cx.notify();
 976        if payload.channel_participants.is_empty() {
 977            return None;
 978        }
 979
 980        let mut all_user_ids = Vec::new();
 981        let channel_participants = payload.channel_participants;
 982        for entry in &channel_participants {
 983            for user_id in entry.participant_user_ids.iter() {
 984                if let Err(ix) = all_user_ids.binary_search(user_id) {
 985                    all_user_ids.insert(ix, *user_id);
 986                }
 987            }
 988        }
 989
 990        let users = self
 991            .user_store
 992            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
 993        Some(cx.spawn(|this, mut cx| async move {
 994            let users = users.await?;
 995
 996            this.update(&mut cx, |this, cx| {
 997                for entry in &channel_participants {
 998                    let mut participants: Vec<_> = entry
 999                        .participant_user_ids
1000                        .iter()
1001                        .filter_map(|user_id| {
1002                            users
1003                                .binary_search_by_key(&user_id, |user| &user.id)
1004                                .ok()
1005                                .map(|ix| users[ix].clone())
1006                        })
1007                        .collect();
1008
1009                    participants.sort_by_key(|u| u.id);
1010
1011                    this.channel_participants
1012                        .insert(entry.channel_id, participants);
1013                }
1014
1015                cx.notify();
1016            })
1017        }))
1018    }
1019}