channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use release_channel::RELEASE_CHANNEL;
  15use rpc::{
  16    proto::{self, ChannelRole, ChannelVisibility},
  17    TypedEnvelope,
  18};
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{async_maybe, maybe, ResultExt};
  21
  22pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  23    let channel_store =
  24        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  25    cx.set_global(GlobalChannelStore(channel_store));
  26}
  27
  28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  29
  30pub type ChannelId = u64;
  31
  32#[derive(Debug, Clone, Default)]
  33struct NotesVersion {
  34    epoch: u64,
  35    version: clock::Global,
  36}
  37
  38pub struct ChannelStore {
  39    pub channel_index: ChannelIndex,
  40    channel_invitations: Vec<Arc<Channel>>,
  41    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  42    channel_states: HashMap<ChannelId, ChannelState>,
  43
  44    outgoing_invites: HashSet<(ChannelId, UserId)>,
  45    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  46    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  47    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  48    client: Arc<Client>,
  49    user_store: Model<UserStore>,
  50    _rpc_subscriptions: [Subscription; 2],
  51    _watch_connection_status: Task<Option<()>>,
  52    disconnect_channel_buffers_task: Option<Task<()>>,
  53    _update_channels: Task<()>,
  54}
  55
  56#[derive(Clone, Debug)]
  57pub struct Channel {
  58    pub id: ChannelId,
  59    pub name: SharedString,
  60    pub visibility: proto::ChannelVisibility,
  61    pub parent_path: Vec<u64>,
  62}
  63
  64#[derive(Default)]
  65pub struct ChannelState {
  66    latest_chat_message: Option<u64>,
  67    latest_notes_versions: Option<NotesVersion>,
  68    observed_chat_message: Option<u64>,
  69    observed_notes_versions: Option<NotesVersion>,
  70    role: Option<ChannelRole>,
  71}
  72
  73impl Channel {
  74    pub fn link(&self) -> String {
  75        RELEASE_CHANNEL.link_prefix().to_owned()
  76            + "channel/"
  77            + &self.slug()
  78            + "-"
  79            + &self.id.to_string()
  80    }
  81
  82    pub fn is_root_channel(&self) -> bool {
  83        self.parent_path.is_empty()
  84    }
  85
  86    pub fn root_id(&self) -> ChannelId {
  87        self.parent_path
  88            .first()
  89            .map(|id| *id as ChannelId)
  90            .unwrap_or(self.id)
  91    }
  92
  93    pub fn slug(&self) -> String {
  94        let slug: String = self
  95            .name
  96            .chars()
  97            .map(|c| if c.is_alphanumeric() { c } else { '-' })
  98            .collect();
  99
 100        slug.trim_matches(|c| c == '-').to_string()
 101    }
 102}
 103
 104pub struct ChannelMembership {
 105    pub user: Arc<User>,
 106    pub kind: proto::channel_member::Kind,
 107    pub role: proto::ChannelRole,
 108}
 109impl ChannelMembership {
 110    pub fn sort_key(&self) -> MembershipSortKey {
 111        MembershipSortKey {
 112            role_order: match self.role {
 113                proto::ChannelRole::Admin => 0,
 114                proto::ChannelRole::Member => 1,
 115                proto::ChannelRole::Banned => 2,
 116                proto::ChannelRole::Guest => 3,
 117            },
 118            kind_order: match self.kind {
 119                proto::channel_member::Kind::Member => 0,
 120                proto::channel_member::Kind::Invitee => 1,
 121            },
 122            username_order: self.user.github_login.as_str(),
 123        }
 124    }
 125}
 126
 127#[derive(PartialOrd, Ord, PartialEq, Eq)]
 128pub struct MembershipSortKey<'a> {
 129    role_order: u8,
 130    kind_order: u8,
 131    username_order: &'a str,
 132}
 133
 134pub enum ChannelEvent {
 135    ChannelCreated(ChannelId),
 136    ChannelRenamed(ChannelId),
 137}
 138
 139impl EventEmitter<ChannelEvent> for ChannelStore {}
 140
 141enum OpenedModelHandle<E> {
 142    Open(WeakModel<E>),
 143    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 144}
 145
 146struct GlobalChannelStore(Model<ChannelStore>);
 147
 148impl Global for GlobalChannelStore {}
 149
 150impl ChannelStore {
 151    pub fn global(cx: &AppContext) -> Model<Self> {
 152        cx.global::<GlobalChannelStore>().0.clone()
 153    }
 154
 155    pub fn new(
 156        client: Arc<Client>,
 157        user_store: Model<UserStore>,
 158        cx: &mut ModelContext<Self>,
 159    ) -> Self {
 160        let rpc_subscriptions = [
 161            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 162            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 163        ];
 164
 165        let mut connection_status = client.status();
 166        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 167        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 168            while let Some(status) = connection_status.next().await {
 169                let this = this.upgrade()?;
 170                match status {
 171                    client::Status::Connected { .. } => {
 172                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 173                            .ok()?
 174                            .await
 175                            .log_err()?;
 176                    }
 177                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 178                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 179                            .ok();
 180                    }
 181                    _ => {
 182                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 183                            .ok();
 184                    }
 185                }
 186            }
 187            Some(())
 188        });
 189
 190        Self {
 191            channel_invitations: Vec::default(),
 192            channel_index: ChannelIndex::default(),
 193            channel_participants: Default::default(),
 194            outgoing_invites: Default::default(),
 195            opened_buffers: Default::default(),
 196            opened_chats: Default::default(),
 197            update_channels_tx,
 198            client,
 199            user_store,
 200            _rpc_subscriptions: rpc_subscriptions,
 201            _watch_connection_status: watch_connection_status,
 202            disconnect_channel_buffers_task: None,
 203            _update_channels: cx.spawn(|this, mut cx| async move {
 204                async_maybe!({
 205                    while let Some(update_channels) = update_channels_rx.next().await {
 206                        if let Some(this) = this.upgrade() {
 207                            let update_task = this.update(&mut cx, |this, cx| {
 208                                this.update_channels(update_channels, cx)
 209                            })?;
 210                            if let Some(update_task) = update_task {
 211                                update_task.await.log_err();
 212                            }
 213                        }
 214                    }
 215                    anyhow::Ok(())
 216                })
 217                .await
 218                .log_err();
 219            }),
 220            channel_states: Default::default(),
 221        }
 222    }
 223
 224    pub fn client(&self) -> Arc<Client> {
 225        self.client.clone()
 226    }
 227
 228    /// Returns the number of unique channels in the store
 229    pub fn channel_count(&self) -> usize {
 230        self.channel_index.by_id().len()
 231    }
 232
 233    /// Returns the index of a channel ID in the list of unique channels
 234    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 235        self.channel_index
 236            .by_id()
 237            .keys()
 238            .position(|id| *id == channel_id)
 239    }
 240
 241    /// Returns an iterator over all unique channels
 242    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 243        self.channel_index.by_id().values()
 244    }
 245
 246    /// Iterate over all entries in the channel DAG
 247    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 248        self.channel_index
 249            .ordered_channels()
 250            .iter()
 251            .filter_map(move |id| {
 252                let channel = self.channel_index.by_id().get(id)?;
 253                Some((channel.parent_path.len(), channel))
 254            })
 255    }
 256
 257    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 258        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 259        self.channel_index.by_id().get(channel_id)
 260    }
 261
 262    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 263        self.channel_index.by_id().values().nth(ix)
 264    }
 265
 266    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 267        self.channel_invitations
 268            .iter()
 269            .any(|channel| channel.id == channel_id)
 270    }
 271
 272    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 273        &self.channel_invitations
 274    }
 275
 276    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 277        self.channel_index.by_id().get(&channel_id)
 278    }
 279
 280    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 281        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 282            if let OpenedModelHandle::Open(buffer) = buffer {
 283                return buffer.upgrade().is_some();
 284            }
 285        }
 286        false
 287    }
 288
 289    pub fn open_channel_buffer(
 290        &mut self,
 291        channel_id: ChannelId,
 292        cx: &mut ModelContext<Self>,
 293    ) -> Task<Result<Model<ChannelBuffer>>> {
 294        let client = self.client.clone();
 295        let user_store = self.user_store.clone();
 296        let channel_store = cx.handle();
 297        self.open_channel_resource(
 298            channel_id,
 299            |this| &mut this.opened_buffers,
 300            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 301            cx,
 302        )
 303    }
 304
 305    pub fn fetch_channel_messages(
 306        &self,
 307        message_ids: Vec<u64>,
 308        cx: &mut ModelContext<Self>,
 309    ) -> Task<Result<Vec<ChannelMessage>>> {
 310        let request = if message_ids.is_empty() {
 311            None
 312        } else {
 313            Some(
 314                self.client
 315                    .request(proto::GetChannelMessagesById { message_ids }),
 316            )
 317        };
 318        cx.spawn(|this, mut cx| async move {
 319            if let Some(request) = request {
 320                let response = request.await?;
 321                let this = this
 322                    .upgrade()
 323                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 324                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 325                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 326            } else {
 327                Ok(Vec::new())
 328            }
 329        })
 330    }
 331
 332    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 333        self.channel_states
 334            .get(&channel_id)
 335            .is_some_and(|state| state.has_channel_buffer_changed())
 336    }
 337
 338    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 339        self.channel_states
 340            .get(&channel_id)
 341            .is_some_and(|state| state.has_new_messages())
 342    }
 343
 344    pub fn acknowledge_message_id(
 345        &mut self,
 346        channel_id: ChannelId,
 347        message_id: u64,
 348        cx: &mut ModelContext<Self>,
 349    ) {
 350        self.channel_states
 351            .entry(channel_id)
 352            .or_insert_with(|| Default::default())
 353            .acknowledge_message_id(message_id);
 354        cx.notify();
 355    }
 356
 357    pub fn update_latest_message_id(
 358        &mut self,
 359        channel_id: ChannelId,
 360        message_id: u64,
 361        cx: &mut ModelContext<Self>,
 362    ) {
 363        self.channel_states
 364            .entry(channel_id)
 365            .or_insert_with(|| Default::default())
 366            .update_latest_message_id(message_id);
 367        cx.notify();
 368    }
 369
 370    pub fn acknowledge_notes_version(
 371        &mut self,
 372        channel_id: ChannelId,
 373        epoch: u64,
 374        version: &clock::Global,
 375        cx: &mut ModelContext<Self>,
 376    ) {
 377        self.channel_states
 378            .entry(channel_id)
 379            .or_insert_with(|| Default::default())
 380            .acknowledge_notes_version(epoch, version);
 381        cx.notify()
 382    }
 383
 384    pub fn update_latest_notes_version(
 385        &mut self,
 386        channel_id: ChannelId,
 387        epoch: u64,
 388        version: &clock::Global,
 389        cx: &mut ModelContext<Self>,
 390    ) {
 391        self.channel_states
 392            .entry(channel_id)
 393            .or_insert_with(|| Default::default())
 394            .update_latest_notes_version(epoch, version);
 395        cx.notify()
 396    }
 397
 398    pub fn open_channel_chat(
 399        &mut self,
 400        channel_id: ChannelId,
 401        cx: &mut ModelContext<Self>,
 402    ) -> Task<Result<Model<ChannelChat>>> {
 403        let client = self.client.clone();
 404        let user_store = self.user_store.clone();
 405        let this = cx.handle();
 406        self.open_channel_resource(
 407            channel_id,
 408            |this| &mut this.opened_chats,
 409            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 410            cx,
 411        )
 412    }
 413
 414    /// Asynchronously open a given resource associated with a channel.
 415    ///
 416    /// Make sure that the resource is only opened once, even if this method
 417    /// is called multiple times with the same channel id while the first task
 418    /// is still running.
 419    fn open_channel_resource<T, F, Fut>(
 420        &mut self,
 421        channel_id: ChannelId,
 422        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 423        load: F,
 424        cx: &mut ModelContext<Self>,
 425    ) -> Task<Result<Model<T>>>
 426    where
 427        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 428        Fut: Future<Output = Result<Model<T>>>,
 429        T: 'static,
 430    {
 431        let task = loop {
 432            match get_map(self).entry(channel_id) {
 433                hash_map::Entry::Occupied(e) => match e.get() {
 434                    OpenedModelHandle::Open(model) => {
 435                        if let Some(model) = model.upgrade() {
 436                            break Task::ready(Ok(model)).shared();
 437                        } else {
 438                            get_map(self).remove(&channel_id);
 439                            continue;
 440                        }
 441                    }
 442                    OpenedModelHandle::Loading(task) => {
 443                        break task.clone();
 444                    }
 445                },
 446                hash_map::Entry::Vacant(e) => {
 447                    let task = cx
 448                        .spawn(move |this, mut cx| async move {
 449                            let channel = this.update(&mut cx, |this, _| {
 450                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 451                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 452                                })
 453                            })??;
 454
 455                            load(channel, cx).await.map_err(Arc::new)
 456                        })
 457                        .shared();
 458
 459                    e.insert(OpenedModelHandle::Loading(task.clone()));
 460                    cx.spawn({
 461                        let task = task.clone();
 462                        move |this, mut cx| async move {
 463                            let result = task.await;
 464                            this.update(&mut cx, |this, _| match result {
 465                                Ok(model) => {
 466                                    get_map(this).insert(
 467                                        channel_id,
 468                                        OpenedModelHandle::Open(model.downgrade()),
 469                                    );
 470                                }
 471                                Err(_) => {
 472                                    get_map(this).remove(&channel_id);
 473                                }
 474                            })
 475                            .ok();
 476                        }
 477                    })
 478                    .detach();
 479                    break task;
 480                }
 481            }
 482        };
 483        cx.background_executor()
 484            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 485    }
 486
 487    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 488        self.channel_role(channel_id) == proto::ChannelRole::Admin
 489    }
 490
 491    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 492        self.channel_index
 493            .by_id()
 494            .get(&channel_id)
 495            .map_or(false, |channel| channel.is_root_channel())
 496    }
 497
 498    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 499        self.channel_index
 500            .by_id()
 501            .get(&channel_id)
 502            .map_or(false, |channel| {
 503                channel.visibility == ChannelVisibility::Public
 504            })
 505    }
 506
 507    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 508        match self.channel_role(channel_id) {
 509            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 510            _ => Capability::ReadOnly,
 511        }
 512    }
 513
 514    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 515        maybe!({
 516            let mut channel = self.channel_for_id(channel_id)?;
 517            if !channel.is_root_channel() {
 518                channel = self.channel_for_id(channel.root_id())?;
 519            }
 520            let root_channel_state = self.channel_states.get(&channel.id);
 521            root_channel_state?.role
 522        })
 523        .unwrap_or(proto::ChannelRole::Guest)
 524    }
 525
 526    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 527        self.channel_participants
 528            .get(&channel_id)
 529            .map_or(&[], |v| v.as_slice())
 530    }
 531
 532    pub fn create_channel(
 533        &self,
 534        name: &str,
 535        parent_id: Option<ChannelId>,
 536        cx: &mut ModelContext<Self>,
 537    ) -> Task<Result<ChannelId>> {
 538        let client = self.client.clone();
 539        let name = name.trim_start_matches("#").to_owned();
 540        cx.spawn(move |this, mut cx| async move {
 541            let response = client
 542                .request(proto::CreateChannel { name, parent_id })
 543                .await?;
 544
 545            let channel = response
 546                .channel
 547                .ok_or_else(|| anyhow!("missing channel in response"))?;
 548            let channel_id = channel.id;
 549
 550            this.update(&mut cx, |this, cx| {
 551                let task = this.update_channels(
 552                    proto::UpdateChannels {
 553                        channels: vec![channel],
 554                        ..Default::default()
 555                    },
 556                    cx,
 557                );
 558                assert!(task.is_none());
 559
 560                // This event is emitted because the collab panel wants to clear the pending edit state
 561                // before this frame is rendered. But we can't guarantee that the collab panel's future
 562                // will resolve before this flush_effects finishes. Synchronously emitting this event
 563                // ensures that the collab panel will observe this creation before the frame completes
 564                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 565            })?;
 566
 567            Ok(channel_id)
 568        })
 569    }
 570
 571    pub fn move_channel(
 572        &mut self,
 573        channel_id: ChannelId,
 574        to: ChannelId,
 575        cx: &mut ModelContext<Self>,
 576    ) -> Task<Result<()>> {
 577        let client = self.client.clone();
 578        cx.spawn(move |_, _| async move {
 579            let _ = client
 580                .request(proto::MoveChannel { channel_id, to })
 581                .await?;
 582
 583            Ok(())
 584        })
 585    }
 586
 587    pub fn set_channel_visibility(
 588        &mut self,
 589        channel_id: ChannelId,
 590        visibility: ChannelVisibility,
 591        cx: &mut ModelContext<Self>,
 592    ) -> Task<Result<()>> {
 593        let client = self.client.clone();
 594        cx.spawn(move |_, _| async move {
 595            let _ = client
 596                .request(proto::SetChannelVisibility {
 597                    channel_id,
 598                    visibility: visibility.into(),
 599                })
 600                .await?;
 601
 602            Ok(())
 603        })
 604    }
 605
 606    pub fn invite_member(
 607        &mut self,
 608        channel_id: ChannelId,
 609        user_id: UserId,
 610        role: proto::ChannelRole,
 611        cx: &mut ModelContext<Self>,
 612    ) -> Task<Result<()>> {
 613        if !self.outgoing_invites.insert((channel_id, user_id)) {
 614            return Task::ready(Err(anyhow!("invite request already in progress")));
 615        }
 616
 617        cx.notify();
 618        let client = self.client.clone();
 619        cx.spawn(move |this, mut cx| async move {
 620            let result = client
 621                .request(proto::InviteChannelMember {
 622                    channel_id,
 623                    user_id,
 624                    role: role.into(),
 625                })
 626                .await;
 627
 628            this.update(&mut cx, |this, cx| {
 629                this.outgoing_invites.remove(&(channel_id, user_id));
 630                cx.notify();
 631            })?;
 632
 633            result?;
 634
 635            Ok(())
 636        })
 637    }
 638
 639    pub fn remove_member(
 640        &mut self,
 641        channel_id: ChannelId,
 642        user_id: u64,
 643        cx: &mut ModelContext<Self>,
 644    ) -> Task<Result<()>> {
 645        if !self.outgoing_invites.insert((channel_id, user_id)) {
 646            return Task::ready(Err(anyhow!("invite request already in progress")));
 647        }
 648
 649        cx.notify();
 650        let client = self.client.clone();
 651        cx.spawn(move |this, mut cx| async move {
 652            let result = client
 653                .request(proto::RemoveChannelMember {
 654                    channel_id,
 655                    user_id,
 656                })
 657                .await;
 658
 659            this.update(&mut cx, |this, cx| {
 660                this.outgoing_invites.remove(&(channel_id, user_id));
 661                cx.notify();
 662            })?;
 663            result?;
 664            Ok(())
 665        })
 666    }
 667
 668    pub fn set_member_role(
 669        &mut self,
 670        channel_id: ChannelId,
 671        user_id: UserId,
 672        role: proto::ChannelRole,
 673        cx: &mut ModelContext<Self>,
 674    ) -> Task<Result<()>> {
 675        if !self.outgoing_invites.insert((channel_id, user_id)) {
 676            return Task::ready(Err(anyhow!("member request already in progress")));
 677        }
 678
 679        cx.notify();
 680        let client = self.client.clone();
 681        cx.spawn(move |this, mut cx| async move {
 682            let result = client
 683                .request(proto::SetChannelMemberRole {
 684                    channel_id,
 685                    user_id,
 686                    role: role.into(),
 687                })
 688                .await;
 689
 690            this.update(&mut cx, |this, cx| {
 691                this.outgoing_invites.remove(&(channel_id, user_id));
 692                cx.notify();
 693            })?;
 694
 695            result?;
 696            Ok(())
 697        })
 698    }
 699
 700    pub fn rename(
 701        &mut self,
 702        channel_id: ChannelId,
 703        new_name: &str,
 704        cx: &mut ModelContext<Self>,
 705    ) -> Task<Result<()>> {
 706        let client = self.client.clone();
 707        let name = new_name.to_string();
 708        cx.spawn(move |this, mut cx| async move {
 709            let channel = client
 710                .request(proto::RenameChannel { channel_id, name })
 711                .await?
 712                .channel
 713                .ok_or_else(|| anyhow!("missing channel in response"))?;
 714            this.update(&mut cx, |this, cx| {
 715                let task = this.update_channels(
 716                    proto::UpdateChannels {
 717                        channels: vec![channel],
 718                        ..Default::default()
 719                    },
 720                    cx,
 721                );
 722                assert!(task.is_none());
 723
 724                // This event is emitted because the collab panel wants to clear the pending edit state
 725                // before this frame is rendered. But we can't guarantee that the collab panel's future
 726                // will resolve before this flush_effects finishes. Synchronously emitting this event
 727                // ensures that the collab panel will observe this creation before the frame complete
 728                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 729            })?;
 730            Ok(())
 731        })
 732    }
 733
 734    pub fn respond_to_channel_invite(
 735        &mut self,
 736        channel_id: ChannelId,
 737        accept: bool,
 738        cx: &mut ModelContext<Self>,
 739    ) -> Task<Result<()>> {
 740        let client = self.client.clone();
 741        cx.background_executor().spawn(async move {
 742            client
 743                .request(proto::RespondToChannelInvite { channel_id, accept })
 744                .await?;
 745            Ok(())
 746        })
 747    }
 748
 749    pub fn get_channel_member_details(
 750        &self,
 751        channel_id: ChannelId,
 752        cx: &mut ModelContext<Self>,
 753    ) -> Task<Result<Vec<ChannelMembership>>> {
 754        let client = self.client.clone();
 755        let user_store = self.user_store.downgrade();
 756        cx.spawn(move |_, mut cx| async move {
 757            let response = client
 758                .request(proto::GetChannelMembers { channel_id })
 759                .await?;
 760
 761            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 762            let user_store = user_store
 763                .upgrade()
 764                .ok_or_else(|| anyhow!("user store dropped"))?;
 765            let users = user_store
 766                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 767                .await?;
 768
 769            Ok(users
 770                .into_iter()
 771                .zip(response.members)
 772                .filter_map(|(user, member)| {
 773                    Some(ChannelMembership {
 774                        user,
 775                        role: member.role(),
 776                        kind: member.kind(),
 777                    })
 778                })
 779                .collect())
 780        })
 781    }
 782
 783    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 784        let client = self.client.clone();
 785        async move {
 786            client.request(proto::DeleteChannel { channel_id }).await?;
 787            Ok(())
 788        }
 789    }
 790
 791    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 792        false
 793    }
 794
 795    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 796        self.outgoing_invites.contains(&(channel_id, user_id))
 797    }
 798
 799    async fn handle_update_channels(
 800        this: Model<Self>,
 801        message: TypedEnvelope<proto::UpdateChannels>,
 802        _: Arc<Client>,
 803        mut cx: AsyncAppContext,
 804    ) -> Result<()> {
 805        this.update(&mut cx, |this, _| {
 806            this.update_channels_tx
 807                .unbounded_send(message.payload)
 808                .unwrap();
 809        })?;
 810        Ok(())
 811    }
 812
 813    async fn handle_update_user_channels(
 814        this: Model<Self>,
 815        message: TypedEnvelope<proto::UpdateUserChannels>,
 816        _: Arc<Client>,
 817        mut cx: AsyncAppContext,
 818    ) -> Result<()> {
 819        this.update(&mut cx, |this, cx| {
 820            for buffer_version in message.payload.observed_channel_buffer_version {
 821                let version = language::proto::deserialize_version(&buffer_version.version);
 822                this.acknowledge_notes_version(
 823                    buffer_version.channel_id,
 824                    buffer_version.epoch,
 825                    &version,
 826                    cx,
 827                );
 828            }
 829            for message_id in message.payload.observed_channel_message_id {
 830                this.acknowledge_message_id(message_id.channel_id, message_id.message_id, cx);
 831            }
 832            for membership in message.payload.channel_memberships {
 833                if let Some(role) = ChannelRole::from_i32(membership.role) {
 834                    this.channel_states
 835                        .entry(membership.channel_id)
 836                        .or_insert_with(|| ChannelState::default())
 837                        .set_role(role)
 838                }
 839            }
 840        })
 841    }
 842
 843    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 844        self.channel_index.clear();
 845        self.channel_invitations.clear();
 846        self.channel_participants.clear();
 847        self.channel_index.clear();
 848        self.outgoing_invites.clear();
 849        self.disconnect_channel_buffers_task.take();
 850
 851        for chat in self.opened_chats.values() {
 852            if let OpenedModelHandle::Open(chat) = chat {
 853                if let Some(chat) = chat.upgrade() {
 854                    chat.update(cx, |chat, cx| {
 855                        chat.rejoin(cx);
 856                    });
 857                }
 858            }
 859        }
 860
 861        let mut buffer_versions = Vec::new();
 862        for buffer in self.opened_buffers.values() {
 863            if let OpenedModelHandle::Open(buffer) = buffer {
 864                if let Some(buffer) = buffer.upgrade() {
 865                    let channel_buffer = buffer.read(cx);
 866                    let buffer = channel_buffer.buffer().read(cx);
 867                    buffer_versions.push(proto::ChannelBufferVersion {
 868                        channel_id: channel_buffer.channel_id,
 869                        epoch: channel_buffer.epoch(),
 870                        version: language::proto::serialize_version(&buffer.version()),
 871                    });
 872                }
 873            }
 874        }
 875
 876        if buffer_versions.is_empty() {
 877            return Task::ready(Ok(()));
 878        }
 879
 880        let response = self.client.request(proto::RejoinChannelBuffers {
 881            buffers: buffer_versions,
 882        });
 883
 884        cx.spawn(|this, mut cx| async move {
 885            let mut response = response.await?;
 886
 887            this.update(&mut cx, |this, cx| {
 888                this.opened_buffers.retain(|_, buffer| match buffer {
 889                    OpenedModelHandle::Open(channel_buffer) => {
 890                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 891                            return false;
 892                        };
 893
 894                        channel_buffer.update(cx, |channel_buffer, cx| {
 895                            let channel_id = channel_buffer.channel_id;
 896                            if let Some(remote_buffer) = response
 897                                .buffers
 898                                .iter_mut()
 899                                .find(|buffer| buffer.channel_id == channel_id)
 900                            {
 901                                let channel_id = channel_buffer.channel_id;
 902                                let remote_version =
 903                                    language::proto::deserialize_version(&remote_buffer.version);
 904
 905                                channel_buffer.replace_collaborators(
 906                                    mem::take(&mut remote_buffer.collaborators),
 907                                    cx,
 908                                );
 909
 910                                let operations = channel_buffer
 911                                    .buffer()
 912                                    .update(cx, |buffer, cx| {
 913                                        let outgoing_operations =
 914                                            buffer.serialize_ops(Some(remote_version), cx);
 915                                        let incoming_operations =
 916                                            mem::take(&mut remote_buffer.operations)
 917                                                .into_iter()
 918                                                .map(language::proto::deserialize_operation)
 919                                                .collect::<Result<Vec<_>>>()?;
 920                                        buffer.apply_ops(incoming_operations, cx)?;
 921                                        anyhow::Ok(outgoing_operations)
 922                                    })
 923                                    .log_err();
 924
 925                                if let Some(operations) = operations {
 926                                    let client = this.client.clone();
 927                                    cx.background_executor()
 928                                        .spawn(async move {
 929                                            let operations = operations.await;
 930                                            for chunk in
 931                                                language::proto::split_operations(operations)
 932                                            {
 933                                                client
 934                                                    .send(proto::UpdateChannelBuffer {
 935                                                        channel_id,
 936                                                        operations: chunk,
 937                                                    })
 938                                                    .ok();
 939                                            }
 940                                        })
 941                                        .detach();
 942                                    return true;
 943                                }
 944                            }
 945
 946                            channel_buffer.disconnect(cx);
 947                            false
 948                        })
 949                    }
 950                    OpenedModelHandle::Loading(_) => true,
 951                });
 952            })
 953            .ok();
 954            anyhow::Ok(())
 955        })
 956    }
 957
 958    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
 959        cx.notify();
 960
 961        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 962            cx.spawn(move |this, mut cx| async move {
 963                if wait_for_reconnect {
 964                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
 965                }
 966
 967                if let Some(this) = this.upgrade() {
 968                    this.update(&mut cx, |this, cx| {
 969                        for (_, buffer) in this.opened_buffers.drain() {
 970                            if let OpenedModelHandle::Open(buffer) = buffer {
 971                                if let Some(buffer) = buffer.upgrade() {
 972                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 973                                }
 974                            }
 975                        }
 976                    })
 977                    .ok();
 978                }
 979            })
 980        });
 981    }
 982
 983    pub(crate) fn update_channels(
 984        &mut self,
 985        payload: proto::UpdateChannels,
 986        cx: &mut ModelContext<ChannelStore>,
 987    ) -> Option<Task<Result<()>>> {
 988        if !payload.remove_channel_invitations.is_empty() {
 989            self.channel_invitations
 990                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 991        }
 992        for channel in payload.channel_invitations {
 993            match self
 994                .channel_invitations
 995                .binary_search_by_key(&channel.id, |c| c.id)
 996            {
 997                Ok(ix) => {
 998                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
 999                }
1000                Err(ix) => self.channel_invitations.insert(
1001                    ix,
1002                    Arc::new(Channel {
1003                        id: channel.id,
1004                        visibility: channel.visibility(),
1005                        name: channel.name.into(),
1006                        parent_path: channel.parent_path,
1007                    }),
1008                ),
1009            }
1010        }
1011
1012        let channels_changed = !payload.channels.is_empty()
1013            || !payload.delete_channels.is_empty()
1014            || !payload.latest_channel_message_ids.is_empty()
1015            || !payload.latest_channel_buffer_versions.is_empty();
1016
1017        if channels_changed {
1018            if !payload.delete_channels.is_empty() {
1019                self.channel_index.delete_channels(&payload.delete_channels);
1020                self.channel_participants
1021                    .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
1022
1023                for channel_id in &payload.delete_channels {
1024                    let channel_id = *channel_id;
1025                    if payload
1026                        .channels
1027                        .iter()
1028                        .any(|channel| channel.id == channel_id)
1029                    {
1030                        continue;
1031                    }
1032                    if let Some(OpenedModelHandle::Open(buffer)) =
1033                        self.opened_buffers.remove(&channel_id)
1034                    {
1035                        if let Some(buffer) = buffer.upgrade() {
1036                            buffer.update(cx, ChannelBuffer::disconnect);
1037                        }
1038                    }
1039                }
1040            }
1041
1042            let mut index = self.channel_index.bulk_insert();
1043            for channel in payload.channels {
1044                let id = channel.id;
1045                let channel_changed = index.insert(channel);
1046
1047                if channel_changed {
1048                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1049                        if let Some(buffer) = buffer.upgrade() {
1050                            buffer.update(cx, ChannelBuffer::channel_changed);
1051                        }
1052                    }
1053                }
1054            }
1055
1056            for latest_buffer_version in payload.latest_channel_buffer_versions {
1057                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1058                self.channel_states
1059                    .entry(latest_buffer_version.channel_id)
1060                    .or_default()
1061                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1062            }
1063
1064            for latest_channel_message in payload.latest_channel_message_ids {
1065                self.channel_states
1066                    .entry(latest_channel_message.channel_id)
1067                    .or_default()
1068                    .update_latest_message_id(latest_channel_message.message_id);
1069            }
1070        }
1071
1072        cx.notify();
1073        if payload.channel_participants.is_empty() {
1074            return None;
1075        }
1076
1077        let mut all_user_ids = Vec::new();
1078        let channel_participants = payload.channel_participants;
1079        for entry in &channel_participants {
1080            for user_id in entry.participant_user_ids.iter() {
1081                if let Err(ix) = all_user_ids.binary_search(user_id) {
1082                    all_user_ids.insert(ix, *user_id);
1083                }
1084            }
1085        }
1086
1087        let users = self
1088            .user_store
1089            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1090        Some(cx.spawn(|this, mut cx| async move {
1091            let users = users.await?;
1092
1093            this.update(&mut cx, |this, cx| {
1094                for entry in &channel_participants {
1095                    let mut participants: Vec<_> = entry
1096                        .participant_user_ids
1097                        .iter()
1098                        .filter_map(|user_id| {
1099                            users
1100                                .binary_search_by_key(&user_id, |user| &user.id)
1101                                .ok()
1102                                .map(|ix| users[ix].clone())
1103                        })
1104                        .collect();
1105
1106                    participants.sort_by_key(|u| u.id);
1107
1108                    this.channel_participants
1109                        .insert(entry.channel_id, participants);
1110                }
1111
1112                cx.notify();
1113            })
1114        }))
1115    }
1116}
1117
1118impl ChannelState {
1119    fn set_role(&mut self, role: ChannelRole) {
1120        self.role = Some(role);
1121    }
1122
1123    fn has_channel_buffer_changed(&self) -> bool {
1124        if let Some(latest_version) = &self.latest_notes_versions {
1125            if let Some(observed_version) = &self.observed_notes_versions {
1126                latest_version.epoch > observed_version.epoch
1127                    || latest_version
1128                        .version
1129                        .changed_since(&observed_version.version)
1130            } else {
1131                true
1132            }
1133        } else {
1134            false
1135        }
1136    }
1137
1138    fn has_new_messages(&self) -> bool {
1139        let latest_message_id = self.latest_chat_message;
1140        let observed_message_id = self.observed_chat_message;
1141
1142        latest_message_id.is_some_and(|latest_message_id| {
1143            latest_message_id > observed_message_id.unwrap_or_default()
1144        })
1145    }
1146
1147    fn acknowledge_message_id(&mut self, message_id: u64) {
1148        let observed = self.observed_chat_message.get_or_insert(message_id);
1149        *observed = (*observed).max(message_id);
1150    }
1151
1152    fn update_latest_message_id(&mut self, message_id: u64) {
1153        self.latest_chat_message =
1154            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1155    }
1156
1157    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1158        if let Some(existing) = &mut self.observed_notes_versions {
1159            if existing.epoch == epoch {
1160                existing.version.join(version);
1161                return;
1162            }
1163        }
1164        self.observed_notes_versions = Some(NotesVersion {
1165            epoch,
1166            version: version.clone(),
1167        });
1168    }
1169
1170    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1171        if let Some(existing) = &mut self.latest_notes_versions {
1172            if existing.epoch == epoch {
1173                existing.version.join(version);
1174                return;
1175            }
1176        }
1177        self.latest_notes_versions = Some(NotesVersion {
1178            epoch,
1179            version: version.clone(),
1180        });
1181    }
1182}