channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use release_channel::RELEASE_CHANNEL;
  15use rpc::{
  16    proto::{self, ChannelRole, ChannelVisibility},
  17    TypedEnvelope,
  18};
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{async_maybe, maybe, ResultExt};
  21
  22pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  23    let channel_store =
  24        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  25    cx.set_global(GlobalChannelStore(channel_store));
  26}
  27
  28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  29
  30pub type ChannelId = u64;
  31
  32#[derive(Debug, Clone, Default)]
  33struct NotesVersion {
  34    epoch: u64,
  35    version: clock::Global,
  36}
  37
  38pub struct ChannelStore {
  39    pub channel_index: ChannelIndex,
  40    channel_invitations: Vec<Arc<Channel>>,
  41    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  42    channel_states: HashMap<ChannelId, ChannelState>,
  43
  44    outgoing_invites: HashSet<(ChannelId, UserId)>,
  45    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  46    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  47    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  48    client: Arc<Client>,
  49    user_store: Model<UserStore>,
  50    _rpc_subscriptions: [Subscription; 2],
  51    _watch_connection_status: Task<Option<()>>,
  52    disconnect_channel_buffers_task: Option<Task<()>>,
  53    _update_channels: Task<()>,
  54}
  55
  56#[derive(Clone, Debug)]
  57pub struct Channel {
  58    pub id: ChannelId,
  59    pub name: SharedString,
  60    pub visibility: proto::ChannelVisibility,
  61    pub parent_path: Vec<u64>,
  62}
  63
  64#[derive(Default)]
  65pub struct ChannelState {
  66    latest_chat_message: Option<u64>,
  67    latest_notes_versions: Option<NotesVersion>,
  68    observed_chat_message: Option<u64>,
  69    observed_notes_versions: Option<NotesVersion>,
  70    role: Option<ChannelRole>,
  71}
  72
  73impl Channel {
  74    pub fn link(&self) -> String {
  75        RELEASE_CHANNEL.link_prefix().to_owned()
  76            + "channel/"
  77            + &Self::slug(&self.name)
  78            + "-"
  79            + &self.id.to_string()
  80    }
  81
  82    pub fn notes_link(&self, heading: Option<String>) -> String {
  83        self.link()
  84            + "/notes"
  85            + &heading
  86                .map(|h| format!("#{}", Self::slug(&h)))
  87                .unwrap_or_default()
  88    }
  89
  90    pub fn is_root_channel(&self) -> bool {
  91        self.parent_path.is_empty()
  92    }
  93
  94    pub fn root_id(&self) -> ChannelId {
  95        self.parent_path
  96            .first()
  97            .map(|id| *id as ChannelId)
  98            .unwrap_or(self.id)
  99    }
 100
 101    pub fn slug(str: &str) -> String {
 102        let slug: String = str
 103            .chars()
 104            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 105            .collect();
 106
 107        slug.trim_matches(|c| c == '-').to_string()
 108    }
 109}
 110
 111pub struct ChannelMembership {
 112    pub user: Arc<User>,
 113    pub kind: proto::channel_member::Kind,
 114    pub role: proto::ChannelRole,
 115}
 116impl ChannelMembership {
 117    pub fn sort_key(&self) -> MembershipSortKey {
 118        MembershipSortKey {
 119            role_order: match self.role {
 120                proto::ChannelRole::Admin => 0,
 121                proto::ChannelRole::Member => 1,
 122                proto::ChannelRole::Banned => 2,
 123                proto::ChannelRole::Guest => 3,
 124            },
 125            kind_order: match self.kind {
 126                proto::channel_member::Kind::Member => 0,
 127                proto::channel_member::Kind::Invitee => 1,
 128            },
 129            username_order: self.user.github_login.as_str(),
 130        }
 131    }
 132}
 133
 134#[derive(PartialOrd, Ord, PartialEq, Eq)]
 135pub struct MembershipSortKey<'a> {
 136    role_order: u8,
 137    kind_order: u8,
 138    username_order: &'a str,
 139}
 140
 141pub enum ChannelEvent {
 142    ChannelCreated(ChannelId),
 143    ChannelRenamed(ChannelId),
 144}
 145
 146impl EventEmitter<ChannelEvent> for ChannelStore {}
 147
 148enum OpenedModelHandle<E> {
 149    Open(WeakModel<E>),
 150    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 151}
 152
 153struct GlobalChannelStore(Model<ChannelStore>);
 154
 155impl Global for GlobalChannelStore {}
 156
 157impl ChannelStore {
 158    pub fn global(cx: &AppContext) -> Model<Self> {
 159        cx.global::<GlobalChannelStore>().0.clone()
 160    }
 161
 162    pub fn new(
 163        client: Arc<Client>,
 164        user_store: Model<UserStore>,
 165        cx: &mut ModelContext<Self>,
 166    ) -> Self {
 167        let rpc_subscriptions = [
 168            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 169            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 170        ];
 171
 172        let mut connection_status = client.status();
 173        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 174        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 175            while let Some(status) = connection_status.next().await {
 176                let this = this.upgrade()?;
 177                match status {
 178                    client::Status::Connected { .. } => {
 179                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 180                            .ok()?
 181                            .await
 182                            .log_err()?;
 183                    }
 184                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 185                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 186                            .ok();
 187                    }
 188                    _ => {
 189                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 190                            .ok();
 191                    }
 192                }
 193            }
 194            Some(())
 195        });
 196
 197        Self {
 198            channel_invitations: Vec::default(),
 199            channel_index: ChannelIndex::default(),
 200            channel_participants: Default::default(),
 201            outgoing_invites: Default::default(),
 202            opened_buffers: Default::default(),
 203            opened_chats: Default::default(),
 204            update_channels_tx,
 205            client,
 206            user_store,
 207            _rpc_subscriptions: rpc_subscriptions,
 208            _watch_connection_status: watch_connection_status,
 209            disconnect_channel_buffers_task: None,
 210            _update_channels: cx.spawn(|this, mut cx| async move {
 211                async_maybe!({
 212                    while let Some(update_channels) = update_channels_rx.next().await {
 213                        if let Some(this) = this.upgrade() {
 214                            let update_task = this.update(&mut cx, |this, cx| {
 215                                this.update_channels(update_channels, cx)
 216                            })?;
 217                            if let Some(update_task) = update_task {
 218                                update_task.await.log_err();
 219                            }
 220                        }
 221                    }
 222                    anyhow::Ok(())
 223                })
 224                .await
 225                .log_err();
 226            }),
 227            channel_states: Default::default(),
 228        }
 229    }
 230
 231    pub fn client(&self) -> Arc<Client> {
 232        self.client.clone()
 233    }
 234
 235    /// Returns the number of unique channels in the store
 236    pub fn channel_count(&self) -> usize {
 237        self.channel_index.by_id().len()
 238    }
 239
 240    /// Returns the index of a channel ID in the list of unique channels
 241    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 242        self.channel_index
 243            .by_id()
 244            .keys()
 245            .position(|id| *id == channel_id)
 246    }
 247
 248    /// Returns an iterator over all unique channels
 249    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 250        self.channel_index.by_id().values()
 251    }
 252
 253    /// Iterate over all entries in the channel DAG
 254    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 255        self.channel_index
 256            .ordered_channels()
 257            .iter()
 258            .filter_map(move |id| {
 259                let channel = self.channel_index.by_id().get(id)?;
 260                Some((channel.parent_path.len(), channel))
 261            })
 262    }
 263
 264    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 265        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 266        self.channel_index.by_id().get(channel_id)
 267    }
 268
 269    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 270        self.channel_index.by_id().values().nth(ix)
 271    }
 272
 273    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 274        self.channel_invitations
 275            .iter()
 276            .any(|channel| channel.id == channel_id)
 277    }
 278
 279    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 280        &self.channel_invitations
 281    }
 282
 283    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 284        self.channel_index.by_id().get(&channel_id)
 285    }
 286
 287    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 288        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 289            if let OpenedModelHandle::Open(buffer) = buffer {
 290                return buffer.upgrade().is_some();
 291            }
 292        }
 293        false
 294    }
 295
 296    pub fn open_channel_buffer(
 297        &mut self,
 298        channel_id: ChannelId,
 299        cx: &mut ModelContext<Self>,
 300    ) -> Task<Result<Model<ChannelBuffer>>> {
 301        let client = self.client.clone();
 302        let user_store = self.user_store.clone();
 303        let channel_store = cx.handle();
 304        self.open_channel_resource(
 305            channel_id,
 306            |this| &mut this.opened_buffers,
 307            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 308            cx,
 309        )
 310    }
 311
 312    pub fn fetch_channel_messages(
 313        &self,
 314        message_ids: Vec<u64>,
 315        cx: &mut ModelContext<Self>,
 316    ) -> Task<Result<Vec<ChannelMessage>>> {
 317        let request = if message_ids.is_empty() {
 318            None
 319        } else {
 320            Some(
 321                self.client
 322                    .request(proto::GetChannelMessagesById { message_ids }),
 323            )
 324        };
 325        cx.spawn(|this, mut cx| async move {
 326            if let Some(request) = request {
 327                let response = request.await?;
 328                let this = this
 329                    .upgrade()
 330                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 331                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 332                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 333            } else {
 334                Ok(Vec::new())
 335            }
 336        })
 337    }
 338
 339    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 340        self.channel_states
 341            .get(&channel_id)
 342            .is_some_and(|state| state.has_channel_buffer_changed())
 343    }
 344
 345    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 346        self.channel_states
 347            .get(&channel_id)
 348            .is_some_and(|state| state.has_new_messages())
 349    }
 350
 351    pub fn acknowledge_message_id(
 352        &mut self,
 353        channel_id: ChannelId,
 354        message_id: u64,
 355        cx: &mut ModelContext<Self>,
 356    ) {
 357        self.channel_states
 358            .entry(channel_id)
 359            .or_insert_with(|| Default::default())
 360            .acknowledge_message_id(message_id);
 361        cx.notify();
 362    }
 363
 364    pub fn update_latest_message_id(
 365        &mut self,
 366        channel_id: ChannelId,
 367        message_id: u64,
 368        cx: &mut ModelContext<Self>,
 369    ) {
 370        self.channel_states
 371            .entry(channel_id)
 372            .or_insert_with(|| Default::default())
 373            .update_latest_message_id(message_id);
 374        cx.notify();
 375    }
 376
 377    pub fn acknowledge_notes_version(
 378        &mut self,
 379        channel_id: ChannelId,
 380        epoch: u64,
 381        version: &clock::Global,
 382        cx: &mut ModelContext<Self>,
 383    ) {
 384        self.channel_states
 385            .entry(channel_id)
 386            .or_insert_with(|| Default::default())
 387            .acknowledge_notes_version(epoch, version);
 388        cx.notify()
 389    }
 390
 391    pub fn update_latest_notes_version(
 392        &mut self,
 393        channel_id: ChannelId,
 394        epoch: u64,
 395        version: &clock::Global,
 396        cx: &mut ModelContext<Self>,
 397    ) {
 398        self.channel_states
 399            .entry(channel_id)
 400            .or_insert_with(|| Default::default())
 401            .update_latest_notes_version(epoch, version);
 402        cx.notify()
 403    }
 404
 405    pub fn open_channel_chat(
 406        &mut self,
 407        channel_id: ChannelId,
 408        cx: &mut ModelContext<Self>,
 409    ) -> Task<Result<Model<ChannelChat>>> {
 410        let client = self.client.clone();
 411        let user_store = self.user_store.clone();
 412        let this = cx.handle();
 413        self.open_channel_resource(
 414            channel_id,
 415            |this| &mut this.opened_chats,
 416            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 417            cx,
 418        )
 419    }
 420
 421    /// Asynchronously open a given resource associated with a channel.
 422    ///
 423    /// Make sure that the resource is only opened once, even if this method
 424    /// is called multiple times with the same channel id while the first task
 425    /// is still running.
 426    fn open_channel_resource<T, F, Fut>(
 427        &mut self,
 428        channel_id: ChannelId,
 429        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 430        load: F,
 431        cx: &mut ModelContext<Self>,
 432    ) -> Task<Result<Model<T>>>
 433    where
 434        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 435        Fut: Future<Output = Result<Model<T>>>,
 436        T: 'static,
 437    {
 438        let task = loop {
 439            match get_map(self).entry(channel_id) {
 440                hash_map::Entry::Occupied(e) => match e.get() {
 441                    OpenedModelHandle::Open(model) => {
 442                        if let Some(model) = model.upgrade() {
 443                            break Task::ready(Ok(model)).shared();
 444                        } else {
 445                            get_map(self).remove(&channel_id);
 446                            continue;
 447                        }
 448                    }
 449                    OpenedModelHandle::Loading(task) => {
 450                        break task.clone();
 451                    }
 452                },
 453                hash_map::Entry::Vacant(e) => {
 454                    let task = cx
 455                        .spawn(move |this, mut cx| async move {
 456                            let channel = this.update(&mut cx, |this, _| {
 457                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 458                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 459                                })
 460                            })??;
 461
 462                            load(channel, cx).await.map_err(Arc::new)
 463                        })
 464                        .shared();
 465
 466                    e.insert(OpenedModelHandle::Loading(task.clone()));
 467                    cx.spawn({
 468                        let task = task.clone();
 469                        move |this, mut cx| async move {
 470                            let result = task.await;
 471                            this.update(&mut cx, |this, _| match result {
 472                                Ok(model) => {
 473                                    get_map(this).insert(
 474                                        channel_id,
 475                                        OpenedModelHandle::Open(model.downgrade()),
 476                                    );
 477                                }
 478                                Err(_) => {
 479                                    get_map(this).remove(&channel_id);
 480                                }
 481                            })
 482                            .ok();
 483                        }
 484                    })
 485                    .detach();
 486                    break task;
 487                }
 488            }
 489        };
 490        cx.background_executor()
 491            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 492    }
 493
 494    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 495        self.channel_role(channel_id) == proto::ChannelRole::Admin
 496    }
 497
 498    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 499        self.channel_index
 500            .by_id()
 501            .get(&channel_id)
 502            .map_or(false, |channel| channel.is_root_channel())
 503    }
 504
 505    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 506        self.channel_index
 507            .by_id()
 508            .get(&channel_id)
 509            .map_or(false, |channel| {
 510                channel.visibility == ChannelVisibility::Public
 511            })
 512    }
 513
 514    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 515        match self.channel_role(channel_id) {
 516            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 517            _ => Capability::ReadOnly,
 518        }
 519    }
 520
 521    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 522        maybe!({
 523            let mut channel = self.channel_for_id(channel_id)?;
 524            if !channel.is_root_channel() {
 525                channel = self.channel_for_id(channel.root_id())?;
 526            }
 527            let root_channel_state = self.channel_states.get(&channel.id);
 528            root_channel_state?.role
 529        })
 530        .unwrap_or(proto::ChannelRole::Guest)
 531    }
 532
 533    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 534        self.channel_participants
 535            .get(&channel_id)
 536            .map_or(&[], |v| v.as_slice())
 537    }
 538
 539    pub fn create_channel(
 540        &self,
 541        name: &str,
 542        parent_id: Option<ChannelId>,
 543        cx: &mut ModelContext<Self>,
 544    ) -> Task<Result<ChannelId>> {
 545        let client = self.client.clone();
 546        let name = name.trim_start_matches("#").to_owned();
 547        cx.spawn(move |this, mut cx| async move {
 548            let response = client
 549                .request(proto::CreateChannel { name, parent_id })
 550                .await?;
 551
 552            let channel = response
 553                .channel
 554                .ok_or_else(|| anyhow!("missing channel in response"))?;
 555            let channel_id = channel.id;
 556
 557            this.update(&mut cx, |this, cx| {
 558                let task = this.update_channels(
 559                    proto::UpdateChannels {
 560                        channels: vec![channel],
 561                        ..Default::default()
 562                    },
 563                    cx,
 564                );
 565                assert!(task.is_none());
 566
 567                // This event is emitted because the collab panel wants to clear the pending edit state
 568                // before this frame is rendered. But we can't guarantee that the collab panel's future
 569                // will resolve before this flush_effects finishes. Synchronously emitting this event
 570                // ensures that the collab panel will observe this creation before the frame completes
 571                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 572            })?;
 573
 574            Ok(channel_id)
 575        })
 576    }
 577
 578    pub fn move_channel(
 579        &mut self,
 580        channel_id: ChannelId,
 581        to: ChannelId,
 582        cx: &mut ModelContext<Self>,
 583    ) -> Task<Result<()>> {
 584        let client = self.client.clone();
 585        cx.spawn(move |_, _| async move {
 586            let _ = client
 587                .request(proto::MoveChannel { channel_id, to })
 588                .await?;
 589
 590            Ok(())
 591        })
 592    }
 593
 594    pub fn set_channel_visibility(
 595        &mut self,
 596        channel_id: ChannelId,
 597        visibility: ChannelVisibility,
 598        cx: &mut ModelContext<Self>,
 599    ) -> Task<Result<()>> {
 600        let client = self.client.clone();
 601        cx.spawn(move |_, _| async move {
 602            let _ = client
 603                .request(proto::SetChannelVisibility {
 604                    channel_id,
 605                    visibility: visibility.into(),
 606                })
 607                .await?;
 608
 609            Ok(())
 610        })
 611    }
 612
 613    pub fn invite_member(
 614        &mut self,
 615        channel_id: ChannelId,
 616        user_id: UserId,
 617        role: proto::ChannelRole,
 618        cx: &mut ModelContext<Self>,
 619    ) -> Task<Result<()>> {
 620        if !self.outgoing_invites.insert((channel_id, user_id)) {
 621            return Task::ready(Err(anyhow!("invite request already in progress")));
 622        }
 623
 624        cx.notify();
 625        let client = self.client.clone();
 626        cx.spawn(move |this, mut cx| async move {
 627            let result = client
 628                .request(proto::InviteChannelMember {
 629                    channel_id,
 630                    user_id,
 631                    role: role.into(),
 632                })
 633                .await;
 634
 635            this.update(&mut cx, |this, cx| {
 636                this.outgoing_invites.remove(&(channel_id, user_id));
 637                cx.notify();
 638            })?;
 639
 640            result?;
 641
 642            Ok(())
 643        })
 644    }
 645
 646    pub fn remove_member(
 647        &mut self,
 648        channel_id: ChannelId,
 649        user_id: u64,
 650        cx: &mut ModelContext<Self>,
 651    ) -> Task<Result<()>> {
 652        if !self.outgoing_invites.insert((channel_id, user_id)) {
 653            return Task::ready(Err(anyhow!("invite request already in progress")));
 654        }
 655
 656        cx.notify();
 657        let client = self.client.clone();
 658        cx.spawn(move |this, mut cx| async move {
 659            let result = client
 660                .request(proto::RemoveChannelMember {
 661                    channel_id,
 662                    user_id,
 663                })
 664                .await;
 665
 666            this.update(&mut cx, |this, cx| {
 667                this.outgoing_invites.remove(&(channel_id, user_id));
 668                cx.notify();
 669            })?;
 670            result?;
 671            Ok(())
 672        })
 673    }
 674
 675    pub fn set_member_role(
 676        &mut self,
 677        channel_id: ChannelId,
 678        user_id: UserId,
 679        role: proto::ChannelRole,
 680        cx: &mut ModelContext<Self>,
 681    ) -> Task<Result<()>> {
 682        if !self.outgoing_invites.insert((channel_id, user_id)) {
 683            return Task::ready(Err(anyhow!("member request already in progress")));
 684        }
 685
 686        cx.notify();
 687        let client = self.client.clone();
 688        cx.spawn(move |this, mut cx| async move {
 689            let result = client
 690                .request(proto::SetChannelMemberRole {
 691                    channel_id,
 692                    user_id,
 693                    role: role.into(),
 694                })
 695                .await;
 696
 697            this.update(&mut cx, |this, cx| {
 698                this.outgoing_invites.remove(&(channel_id, user_id));
 699                cx.notify();
 700            })?;
 701
 702            result?;
 703            Ok(())
 704        })
 705    }
 706
 707    pub fn rename(
 708        &mut self,
 709        channel_id: ChannelId,
 710        new_name: &str,
 711        cx: &mut ModelContext<Self>,
 712    ) -> Task<Result<()>> {
 713        let client = self.client.clone();
 714        let name = new_name.to_string();
 715        cx.spawn(move |this, mut cx| async move {
 716            let channel = client
 717                .request(proto::RenameChannel { channel_id, name })
 718                .await?
 719                .channel
 720                .ok_or_else(|| anyhow!("missing channel in response"))?;
 721            this.update(&mut cx, |this, cx| {
 722                let task = this.update_channels(
 723                    proto::UpdateChannels {
 724                        channels: vec![channel],
 725                        ..Default::default()
 726                    },
 727                    cx,
 728                );
 729                assert!(task.is_none());
 730
 731                // This event is emitted because the collab panel wants to clear the pending edit state
 732                // before this frame is rendered. But we can't guarantee that the collab panel's future
 733                // will resolve before this flush_effects finishes. Synchronously emitting this event
 734                // ensures that the collab panel will observe this creation before the frame complete
 735                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 736            })?;
 737            Ok(())
 738        })
 739    }
 740
 741    pub fn respond_to_channel_invite(
 742        &mut self,
 743        channel_id: ChannelId,
 744        accept: bool,
 745        cx: &mut ModelContext<Self>,
 746    ) -> Task<Result<()>> {
 747        let client = self.client.clone();
 748        cx.background_executor().spawn(async move {
 749            client
 750                .request(proto::RespondToChannelInvite { channel_id, accept })
 751                .await?;
 752            Ok(())
 753        })
 754    }
 755
 756    pub fn get_channel_member_details(
 757        &self,
 758        channel_id: ChannelId,
 759        cx: &mut ModelContext<Self>,
 760    ) -> Task<Result<Vec<ChannelMembership>>> {
 761        let client = self.client.clone();
 762        let user_store = self.user_store.downgrade();
 763        cx.spawn(move |_, mut cx| async move {
 764            let response = client
 765                .request(proto::GetChannelMembers { channel_id })
 766                .await?;
 767
 768            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 769            let user_store = user_store
 770                .upgrade()
 771                .ok_or_else(|| anyhow!("user store dropped"))?;
 772            let users = user_store
 773                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 774                .await?;
 775
 776            Ok(users
 777                .into_iter()
 778                .zip(response.members)
 779                .filter_map(|(user, member)| {
 780                    Some(ChannelMembership {
 781                        user,
 782                        role: member.role(),
 783                        kind: member.kind(),
 784                    })
 785                })
 786                .collect())
 787        })
 788    }
 789
 790    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 791        let client = self.client.clone();
 792        async move {
 793            client.request(proto::DeleteChannel { channel_id }).await?;
 794            Ok(())
 795        }
 796    }
 797
 798    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 799        false
 800    }
 801
 802    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 803        self.outgoing_invites.contains(&(channel_id, user_id))
 804    }
 805
 806    async fn handle_update_channels(
 807        this: Model<Self>,
 808        message: TypedEnvelope<proto::UpdateChannels>,
 809        _: Arc<Client>,
 810        mut cx: AsyncAppContext,
 811    ) -> Result<()> {
 812        this.update(&mut cx, |this, _| {
 813            this.update_channels_tx
 814                .unbounded_send(message.payload)
 815                .unwrap();
 816        })?;
 817        Ok(())
 818    }
 819
 820    async fn handle_update_user_channels(
 821        this: Model<Self>,
 822        message: TypedEnvelope<proto::UpdateUserChannels>,
 823        _: Arc<Client>,
 824        mut cx: AsyncAppContext,
 825    ) -> Result<()> {
 826        this.update(&mut cx, |this, cx| {
 827            for buffer_version in message.payload.observed_channel_buffer_version {
 828                let version = language::proto::deserialize_version(&buffer_version.version);
 829                this.acknowledge_notes_version(
 830                    buffer_version.channel_id,
 831                    buffer_version.epoch,
 832                    &version,
 833                    cx,
 834                );
 835            }
 836            for message_id in message.payload.observed_channel_message_id {
 837                this.acknowledge_message_id(message_id.channel_id, message_id.message_id, cx);
 838            }
 839            for membership in message.payload.channel_memberships {
 840                if let Some(role) = ChannelRole::from_i32(membership.role) {
 841                    this.channel_states
 842                        .entry(membership.channel_id)
 843                        .or_insert_with(|| ChannelState::default())
 844                        .set_role(role)
 845                }
 846            }
 847        })
 848    }
 849
 850    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 851        self.channel_index.clear();
 852        self.channel_invitations.clear();
 853        self.channel_participants.clear();
 854        self.channel_index.clear();
 855        self.outgoing_invites.clear();
 856        self.disconnect_channel_buffers_task.take();
 857
 858        for chat in self.opened_chats.values() {
 859            if let OpenedModelHandle::Open(chat) = chat {
 860                if let Some(chat) = chat.upgrade() {
 861                    chat.update(cx, |chat, cx| {
 862                        chat.rejoin(cx);
 863                    });
 864                }
 865            }
 866        }
 867
 868        let mut buffer_versions = Vec::new();
 869        for buffer in self.opened_buffers.values() {
 870            if let OpenedModelHandle::Open(buffer) = buffer {
 871                if let Some(buffer) = buffer.upgrade() {
 872                    let channel_buffer = buffer.read(cx);
 873                    let buffer = channel_buffer.buffer().read(cx);
 874                    buffer_versions.push(proto::ChannelBufferVersion {
 875                        channel_id: channel_buffer.channel_id,
 876                        epoch: channel_buffer.epoch(),
 877                        version: language::proto::serialize_version(&buffer.version()),
 878                    });
 879                }
 880            }
 881        }
 882
 883        if buffer_versions.is_empty() {
 884            return Task::ready(Ok(()));
 885        }
 886
 887        let response = self.client.request(proto::RejoinChannelBuffers {
 888            buffers: buffer_versions,
 889        });
 890
 891        cx.spawn(|this, mut cx| async move {
 892            let mut response = response.await?;
 893
 894            this.update(&mut cx, |this, cx| {
 895                this.opened_buffers.retain(|_, buffer| match buffer {
 896                    OpenedModelHandle::Open(channel_buffer) => {
 897                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 898                            return false;
 899                        };
 900
 901                        channel_buffer.update(cx, |channel_buffer, cx| {
 902                            let channel_id = channel_buffer.channel_id;
 903                            if let Some(remote_buffer) = response
 904                                .buffers
 905                                .iter_mut()
 906                                .find(|buffer| buffer.channel_id == channel_id)
 907                            {
 908                                let channel_id = channel_buffer.channel_id;
 909                                let remote_version =
 910                                    language::proto::deserialize_version(&remote_buffer.version);
 911
 912                                channel_buffer.replace_collaborators(
 913                                    mem::take(&mut remote_buffer.collaborators),
 914                                    cx,
 915                                );
 916
 917                                let operations = channel_buffer
 918                                    .buffer()
 919                                    .update(cx, |buffer, cx| {
 920                                        let outgoing_operations =
 921                                            buffer.serialize_ops(Some(remote_version), cx);
 922                                        let incoming_operations =
 923                                            mem::take(&mut remote_buffer.operations)
 924                                                .into_iter()
 925                                                .map(language::proto::deserialize_operation)
 926                                                .collect::<Result<Vec<_>>>()?;
 927                                        buffer.apply_ops(incoming_operations, cx)?;
 928                                        anyhow::Ok(outgoing_operations)
 929                                    })
 930                                    .log_err();
 931
 932                                if let Some(operations) = operations {
 933                                    let client = this.client.clone();
 934                                    cx.background_executor()
 935                                        .spawn(async move {
 936                                            let operations = operations.await;
 937                                            for chunk in
 938                                                language::proto::split_operations(operations)
 939                                            {
 940                                                client
 941                                                    .send(proto::UpdateChannelBuffer {
 942                                                        channel_id,
 943                                                        operations: chunk,
 944                                                    })
 945                                                    .ok();
 946                                            }
 947                                        })
 948                                        .detach();
 949                                    return true;
 950                                }
 951                            }
 952
 953                            channel_buffer.disconnect(cx);
 954                            false
 955                        })
 956                    }
 957                    OpenedModelHandle::Loading(_) => true,
 958                });
 959            })
 960            .ok();
 961            anyhow::Ok(())
 962        })
 963    }
 964
 965    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
 966        cx.notify();
 967
 968        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 969            cx.spawn(move |this, mut cx| async move {
 970                if wait_for_reconnect {
 971                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
 972                }
 973
 974                if let Some(this) = this.upgrade() {
 975                    this.update(&mut cx, |this, cx| {
 976                        for (_, buffer) in this.opened_buffers.drain() {
 977                            if let OpenedModelHandle::Open(buffer) = buffer {
 978                                if let Some(buffer) = buffer.upgrade() {
 979                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 980                                }
 981                            }
 982                        }
 983                    })
 984                    .ok();
 985                }
 986            })
 987        });
 988    }
 989
 990    pub(crate) fn update_channels(
 991        &mut self,
 992        payload: proto::UpdateChannels,
 993        cx: &mut ModelContext<ChannelStore>,
 994    ) -> Option<Task<Result<()>>> {
 995        if !payload.remove_channel_invitations.is_empty() {
 996            self.channel_invitations
 997                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
 998        }
 999        for channel in payload.channel_invitations {
1000            match self
1001                .channel_invitations
1002                .binary_search_by_key(&channel.id, |c| c.id)
1003            {
1004                Ok(ix) => {
1005                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1006                }
1007                Err(ix) => self.channel_invitations.insert(
1008                    ix,
1009                    Arc::new(Channel {
1010                        id: channel.id,
1011                        visibility: channel.visibility(),
1012                        name: channel.name.into(),
1013                        parent_path: channel.parent_path,
1014                    }),
1015                ),
1016            }
1017        }
1018
1019        let channels_changed = !payload.channels.is_empty()
1020            || !payload.delete_channels.is_empty()
1021            || !payload.latest_channel_message_ids.is_empty()
1022            || !payload.latest_channel_buffer_versions.is_empty();
1023
1024        if channels_changed {
1025            if !payload.delete_channels.is_empty() {
1026                self.channel_index.delete_channels(&payload.delete_channels);
1027                self.channel_participants
1028                    .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
1029
1030                for channel_id in &payload.delete_channels {
1031                    let channel_id = *channel_id;
1032                    if payload
1033                        .channels
1034                        .iter()
1035                        .any(|channel| channel.id == channel_id)
1036                    {
1037                        continue;
1038                    }
1039                    if let Some(OpenedModelHandle::Open(buffer)) =
1040                        self.opened_buffers.remove(&channel_id)
1041                    {
1042                        if let Some(buffer) = buffer.upgrade() {
1043                            buffer.update(cx, ChannelBuffer::disconnect);
1044                        }
1045                    }
1046                }
1047            }
1048
1049            let mut index = self.channel_index.bulk_insert();
1050            for channel in payload.channels {
1051                let id = channel.id;
1052                let channel_changed = index.insert(channel);
1053
1054                if channel_changed {
1055                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1056                        if let Some(buffer) = buffer.upgrade() {
1057                            buffer.update(cx, ChannelBuffer::channel_changed);
1058                        }
1059                    }
1060                }
1061            }
1062
1063            for latest_buffer_version in payload.latest_channel_buffer_versions {
1064                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1065                self.channel_states
1066                    .entry(latest_buffer_version.channel_id)
1067                    .or_default()
1068                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1069            }
1070
1071            for latest_channel_message in payload.latest_channel_message_ids {
1072                self.channel_states
1073                    .entry(latest_channel_message.channel_id)
1074                    .or_default()
1075                    .update_latest_message_id(latest_channel_message.message_id);
1076            }
1077        }
1078
1079        cx.notify();
1080        if payload.channel_participants.is_empty() {
1081            return None;
1082        }
1083
1084        let mut all_user_ids = Vec::new();
1085        let channel_participants = payload.channel_participants;
1086        for entry in &channel_participants {
1087            for user_id in entry.participant_user_ids.iter() {
1088                if let Err(ix) = all_user_ids.binary_search(user_id) {
1089                    all_user_ids.insert(ix, *user_id);
1090                }
1091            }
1092        }
1093
1094        let users = self
1095            .user_store
1096            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1097        Some(cx.spawn(|this, mut cx| async move {
1098            let users = users.await?;
1099
1100            this.update(&mut cx, |this, cx| {
1101                for entry in &channel_participants {
1102                    let mut participants: Vec<_> = entry
1103                        .participant_user_ids
1104                        .iter()
1105                        .filter_map(|user_id| {
1106                            users
1107                                .binary_search_by_key(&user_id, |user| &user.id)
1108                                .ok()
1109                                .map(|ix| users[ix].clone())
1110                        })
1111                        .collect();
1112
1113                    participants.sort_by_key(|u| u.id);
1114
1115                    this.channel_participants
1116                        .insert(entry.channel_id, participants);
1117                }
1118
1119                cx.notify();
1120            })
1121        }))
1122    }
1123}
1124
1125impl ChannelState {
1126    fn set_role(&mut self, role: ChannelRole) {
1127        self.role = Some(role);
1128    }
1129
1130    fn has_channel_buffer_changed(&self) -> bool {
1131        if let Some(latest_version) = &self.latest_notes_versions {
1132            if let Some(observed_version) = &self.observed_notes_versions {
1133                latest_version.epoch > observed_version.epoch
1134                    || latest_version
1135                        .version
1136                        .changed_since(&observed_version.version)
1137            } else {
1138                true
1139            }
1140        } else {
1141            false
1142        }
1143    }
1144
1145    fn has_new_messages(&self) -> bool {
1146        let latest_message_id = self.latest_chat_message;
1147        let observed_message_id = self.observed_chat_message;
1148
1149        latest_message_id.is_some_and(|latest_message_id| {
1150            latest_message_id > observed_message_id.unwrap_or_default()
1151        })
1152    }
1153
1154    fn acknowledge_message_id(&mut self, message_id: u64) {
1155        let observed = self.observed_chat_message.get_or_insert(message_id);
1156        *observed = (*observed).max(message_id);
1157    }
1158
1159    fn update_latest_message_id(&mut self, message_id: u64) {
1160        self.latest_chat_message =
1161            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1162    }
1163
1164    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1165        if let Some(existing) = &mut self.observed_notes_versions {
1166            if existing.epoch == epoch {
1167                existing.version.join(version);
1168                return;
1169            }
1170        }
1171        self.observed_notes_versions = Some(NotesVersion {
1172            epoch,
1173            version: version.clone(),
1174        });
1175    }
1176
1177    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1178        if let Some(existing) = &mut self.latest_notes_versions {
1179            if existing.epoch == epoch {
1180                existing.version.join(version);
1181                return;
1182            }
1183        }
1184        self.latest_notes_versions = Some(NotesVersion {
1185            epoch,
1186            version: version.clone(),
1187        });
1188    }
1189}