channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
  11    WeakEntity,
  12};
  13use language::Capability;
  14use rpc::{
  15    proto::{self, ChannelRole, ChannelVisibility},
  16    TypedEnvelope,
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{maybe, ResultExt};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
  25    let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  26    cx.set_global(GlobalChannelStore(channel_store));
  27}
  28
  29#[derive(Debug, Clone, Default, PartialEq)]
  30struct NotesVersion {
  31    epoch: u64,
  32    version: clock::Global,
  33}
  34
  35pub struct ChannelStore {
  36    pub channel_index: ChannelIndex,
  37    channel_invitations: Vec<Arc<Channel>>,
  38    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  39    channel_states: HashMap<ChannelId, ChannelState>,
  40    outgoing_invites: HashSet<(ChannelId, UserId)>,
  41    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  42    opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
  43    opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
  44    client: Arc<Client>,
  45    did_subscribe: bool,
  46    user_store: Entity<UserStore>,
  47    _rpc_subscriptions: [Subscription; 2],
  48    _watch_connection_status: Task<Option<()>>,
  49    disconnect_channel_buffers_task: Option<Task<()>>,
  50    _update_channels: Task<()>,
  51}
  52
  53#[derive(Clone, Debug)]
  54pub struct Channel {
  55    pub id: ChannelId,
  56    pub name: SharedString,
  57    pub visibility: proto::ChannelVisibility,
  58    pub parent_path: Vec<ChannelId>,
  59}
  60
  61#[derive(Default, Debug)]
  62pub struct ChannelState {
  63    latest_chat_message: Option<u64>,
  64    latest_notes_version: NotesVersion,
  65    observed_notes_version: NotesVersion,
  66    observed_chat_message: Option<u64>,
  67    role: Option<ChannelRole>,
  68}
  69
  70impl Channel {
  71    pub fn link(&self, cx: &App) -> String {
  72        format!(
  73            "{}/channel/{}-{}",
  74            ClientSettings::get_global(cx).server_url,
  75            Self::slug(&self.name),
  76            self.id
  77        )
  78    }
  79
  80    pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
  81        self.link(cx)
  82            + "/notes"
  83            + &heading
  84                .map(|h| format!("#{}", Self::slug(&h)))
  85                .unwrap_or_default()
  86    }
  87
  88    pub fn is_root_channel(&self) -> bool {
  89        self.parent_path.is_empty()
  90    }
  91
  92    pub fn root_id(&self) -> ChannelId {
  93        self.parent_path.first().copied().unwrap_or(self.id)
  94    }
  95
  96    pub fn slug(str: &str) -> String {
  97        let slug: String = str
  98            .chars()
  99            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 100            .collect();
 101
 102        slug.trim_matches(|c| c == '-').to_string()
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub struct ChannelMembership {
 108    pub user: Arc<User>,
 109    pub kind: proto::channel_member::Kind,
 110    pub role: proto::ChannelRole,
 111}
 112impl ChannelMembership {
 113    pub fn sort_key(&self) -> MembershipSortKey {
 114        MembershipSortKey {
 115            role_order: match self.role {
 116                proto::ChannelRole::Admin => 0,
 117                proto::ChannelRole::Member => 1,
 118                proto::ChannelRole::Banned => 2,
 119                proto::ChannelRole::Talker => 3,
 120                proto::ChannelRole::Guest => 4,
 121            },
 122            kind_order: match self.kind {
 123                proto::channel_member::Kind::Member => 0,
 124                proto::channel_member::Kind::Invitee => 1,
 125            },
 126            username_order: self.user.github_login.as_str(),
 127        }
 128    }
 129}
 130
 131#[derive(PartialOrd, Ord, PartialEq, Eq)]
 132pub struct MembershipSortKey<'a> {
 133    role_order: u8,
 134    kind_order: u8,
 135    username_order: &'a str,
 136}
 137
 138pub enum ChannelEvent {
 139    ChannelCreated(ChannelId),
 140    ChannelRenamed(ChannelId),
 141}
 142
 143impl EventEmitter<ChannelEvent> for ChannelStore {}
 144
 145enum OpenEntityHandle<E> {
 146    Open(WeakEntity<E>),
 147    Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
 148}
 149
 150struct GlobalChannelStore(Entity<ChannelStore>);
 151
 152impl Global for GlobalChannelStore {}
 153
 154impl ChannelStore {
 155    pub fn global(cx: &App) -> Entity<Self> {
 156        cx.global::<GlobalChannelStore>().0.clone()
 157    }
 158
 159    pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
 160        let rpc_subscriptions = [
 161            client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
 162            client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
 163        ];
 164
 165        let mut connection_status = client.status();
 166        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 167        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 168            while let Some(status) = connection_status.next().await {
 169                let this = this.upgrade()?;
 170                match status {
 171                    client::Status::Connected { .. } => {
 172                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 173                            .ok()?
 174                            .await
 175                            .log_err()?;
 176                    }
 177                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 178                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 179                            .ok();
 180                    }
 181                    _ => {
 182                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 183                            .ok();
 184                    }
 185                }
 186            }
 187            Some(())
 188        });
 189
 190        Self {
 191            channel_invitations: Vec::default(),
 192            channel_index: ChannelIndex::default(),
 193            channel_participants: Default::default(),
 194            outgoing_invites: Default::default(),
 195            opened_buffers: Default::default(),
 196            opened_chats: Default::default(),
 197            update_channels_tx,
 198            client,
 199            user_store,
 200            _rpc_subscriptions: rpc_subscriptions,
 201            _watch_connection_status: watch_connection_status,
 202            disconnect_channel_buffers_task: None,
 203            _update_channels: cx.spawn(|this, mut cx| async move {
 204                maybe!(async move {
 205                    while let Some(update_channels) = update_channels_rx.next().await {
 206                        if let Some(this) = this.upgrade() {
 207                            let update_task = this.update(&mut cx, |this, cx| {
 208                                this.update_channels(update_channels, cx)
 209                            })?;
 210                            if let Some(update_task) = update_task {
 211                                update_task.await.log_err();
 212                            }
 213                        }
 214                    }
 215                    anyhow::Ok(())
 216                })
 217                .await
 218                .log_err();
 219            }),
 220            channel_states: Default::default(),
 221            did_subscribe: false,
 222        }
 223    }
 224
 225    pub fn initialize(&mut self) {
 226        if !self.did_subscribe
 227            && self
 228                .client
 229                .send(proto::SubscribeToChannels {})
 230                .log_err()
 231                .is_some()
 232        {
 233            self.did_subscribe = true;
 234        }
 235    }
 236
 237    pub fn client(&self) -> Arc<Client> {
 238        self.client.clone()
 239    }
 240
 241    /// Returns the number of unique channels in the store
 242    pub fn channel_count(&self) -> usize {
 243        self.channel_index.by_id().len()
 244    }
 245
 246    /// Returns the index of a channel ID in the list of unique channels
 247    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 248        self.channel_index
 249            .by_id()
 250            .keys()
 251            .position(|id| *id == channel_id)
 252    }
 253
 254    /// Returns an iterator over all unique channels
 255    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 256        self.channel_index.by_id().values()
 257    }
 258
 259    /// Iterate over all entries in the channel DAG
 260    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 261        self.channel_index
 262            .ordered_channels()
 263            .iter()
 264            .filter_map(move |id| {
 265                let channel = self.channel_index.by_id().get(id)?;
 266                Some((channel.parent_path.len(), channel))
 267            })
 268    }
 269
 270    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 271        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 272        self.channel_index.by_id().get(channel_id)
 273    }
 274
 275    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 276        self.channel_index.by_id().values().nth(ix)
 277    }
 278
 279    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 280        self.channel_invitations
 281            .iter()
 282            .any(|channel| channel.id == channel_id)
 283    }
 284
 285    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 286        &self.channel_invitations
 287    }
 288
 289    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 290        self.channel_index.by_id().get(&channel_id)
 291    }
 292
 293    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
 294        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 295            if let OpenEntityHandle::Open(buffer) = buffer {
 296                return buffer.upgrade().is_some();
 297            }
 298        }
 299        false
 300    }
 301
 302    pub fn open_channel_buffer(
 303        &mut self,
 304        channel_id: ChannelId,
 305        cx: &mut Context<Self>,
 306    ) -> Task<Result<Entity<ChannelBuffer>>> {
 307        let client = self.client.clone();
 308        let user_store = self.user_store.clone();
 309        let channel_store = cx.entity();
 310        self.open_channel_resource(
 311            channel_id,
 312            |this| &mut this.opened_buffers,
 313            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 314            cx,
 315        )
 316    }
 317
 318    pub fn fetch_channel_messages(
 319        &self,
 320        message_ids: Vec<u64>,
 321        cx: &mut Context<Self>,
 322    ) -> Task<Result<Vec<ChannelMessage>>> {
 323        let request = if message_ids.is_empty() {
 324            None
 325        } else {
 326            Some(
 327                self.client
 328                    .request(proto::GetChannelMessagesById { message_ids }),
 329            )
 330        };
 331        cx.spawn(|this, mut cx| async move {
 332            if let Some(request) = request {
 333                let response = request.await?;
 334                let this = this
 335                    .upgrade()
 336                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 337                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 338                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 339            } else {
 340                Ok(Vec::new())
 341            }
 342        })
 343    }
 344
 345    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 346        self.channel_states
 347            .get(&channel_id)
 348            .is_some_and(|state| state.has_channel_buffer_changed())
 349    }
 350
 351    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 352        self.channel_states
 353            .get(&channel_id)
 354            .is_some_and(|state| state.has_new_messages())
 355    }
 356
 357    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 358        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 359            state.latest_chat_message = message_id;
 360        }
 361    }
 362
 363    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 364        self.channel_states.get(&channel_id).and_then(|state| {
 365            if let Some(last_message_id) = state.latest_chat_message {
 366                if state
 367                    .last_acknowledged_message_id()
 368                    .is_some_and(|id| id < last_message_id)
 369                {
 370                    return state.last_acknowledged_message_id();
 371                }
 372            }
 373
 374            None
 375        })
 376    }
 377
 378    pub fn acknowledge_message_id(
 379        &mut self,
 380        channel_id: ChannelId,
 381        message_id: u64,
 382        cx: &mut Context<Self>,
 383    ) {
 384        self.channel_states
 385            .entry(channel_id)
 386            .or_default()
 387            .acknowledge_message_id(message_id);
 388        cx.notify();
 389    }
 390
 391    pub fn update_latest_message_id(
 392        &mut self,
 393        channel_id: ChannelId,
 394        message_id: u64,
 395        cx: &mut Context<Self>,
 396    ) {
 397        self.channel_states
 398            .entry(channel_id)
 399            .or_default()
 400            .update_latest_message_id(message_id);
 401        cx.notify();
 402    }
 403
 404    pub fn acknowledge_notes_version(
 405        &mut self,
 406        channel_id: ChannelId,
 407        epoch: u64,
 408        version: &clock::Global,
 409        cx: &mut Context<Self>,
 410    ) {
 411        self.channel_states
 412            .entry(channel_id)
 413            .or_default()
 414            .acknowledge_notes_version(epoch, version);
 415        cx.notify()
 416    }
 417
 418    pub fn update_latest_notes_version(
 419        &mut self,
 420        channel_id: ChannelId,
 421        epoch: u64,
 422        version: &clock::Global,
 423        cx: &mut Context<Self>,
 424    ) {
 425        self.channel_states
 426            .entry(channel_id)
 427            .or_default()
 428            .update_latest_notes_version(epoch, version);
 429        cx.notify()
 430    }
 431
 432    pub fn open_channel_chat(
 433        &mut self,
 434        channel_id: ChannelId,
 435        cx: &mut Context<Self>,
 436    ) -> Task<Result<Entity<ChannelChat>>> {
 437        let client = self.client.clone();
 438        let user_store = self.user_store.clone();
 439        let this = cx.entity();
 440        self.open_channel_resource(
 441            channel_id,
 442            |this| &mut this.opened_chats,
 443            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 444            cx,
 445        )
 446    }
 447
 448    /// Asynchronously open a given resource associated with a channel.
 449    ///
 450    /// Make sure that the resource is only opened once, even if this method
 451    /// is called multiple times with the same channel id while the first task
 452    /// is still running.
 453    fn open_channel_resource<T, F, Fut>(
 454        &mut self,
 455        channel_id: ChannelId,
 456        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
 457        load: F,
 458        cx: &mut Context<Self>,
 459    ) -> Task<Result<Entity<T>>>
 460    where
 461        F: 'static + FnOnce(Arc<Channel>, AsyncApp) -> Fut,
 462        Fut: Future<Output = Result<Entity<T>>>,
 463        T: 'static,
 464    {
 465        let task = loop {
 466            match get_map(self).entry(channel_id) {
 467                hash_map::Entry::Occupied(e) => match e.get() {
 468                    OpenEntityHandle::Open(entity) => {
 469                        if let Some(entity) = entity.upgrade() {
 470                            break Task::ready(Ok(entity)).shared();
 471                        } else {
 472                            get_map(self).remove(&channel_id);
 473                            continue;
 474                        }
 475                    }
 476                    OpenEntityHandle::Loading(task) => {
 477                        break task.clone();
 478                    }
 479                },
 480                hash_map::Entry::Vacant(e) => {
 481                    let task = cx
 482                        .spawn(move |this, mut cx| async move {
 483                            let channel = this.update(&mut cx, |this, _| {
 484                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 485                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 486                                })
 487                            })??;
 488
 489                            load(channel, cx).await.map_err(Arc::new)
 490                        })
 491                        .shared();
 492
 493                    e.insert(OpenEntityHandle::Loading(task.clone()));
 494                    cx.spawn({
 495                        let task = task.clone();
 496                        move |this, mut cx| async move {
 497                            let result = task.await;
 498                            this.update(&mut cx, |this, _| match result {
 499                                Ok(model) => {
 500                                    get_map(this).insert(
 501                                        channel_id,
 502                                        OpenEntityHandle::Open(model.downgrade()),
 503                                    );
 504                                }
 505                                Err(_) => {
 506                                    get_map(this).remove(&channel_id);
 507                                }
 508                            })
 509                            .ok();
 510                        }
 511                    })
 512                    .detach();
 513                    break task;
 514                }
 515            }
 516        };
 517        cx.background_executor()
 518            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 519    }
 520
 521    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 522        self.channel_role(channel_id) == proto::ChannelRole::Admin
 523    }
 524
 525    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 526        self.channel_index
 527            .by_id()
 528            .get(&channel_id)
 529            .map_or(false, |channel| channel.is_root_channel())
 530    }
 531
 532    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 533        self.channel_index
 534            .by_id()
 535            .get(&channel_id)
 536            .map_or(false, |channel| {
 537                channel.visibility == ChannelVisibility::Public
 538            })
 539    }
 540
 541    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 542        match self.channel_role(channel_id) {
 543            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 544            _ => Capability::ReadOnly,
 545        }
 546    }
 547
 548    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 549        maybe!({
 550            let mut channel = self.channel_for_id(channel_id)?;
 551            if !channel.is_root_channel() {
 552                channel = self.channel_for_id(channel.root_id())?;
 553            }
 554            let root_channel_state = self.channel_states.get(&channel.id);
 555            root_channel_state?.role
 556        })
 557        .unwrap_or(proto::ChannelRole::Guest)
 558    }
 559
 560    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 561        self.channel_participants
 562            .get(&channel_id)
 563            .map_or(&[], |v| v.as_slice())
 564    }
 565
 566    pub fn create_channel(
 567        &self,
 568        name: &str,
 569        parent_id: Option<ChannelId>,
 570        cx: &mut Context<Self>,
 571    ) -> Task<Result<ChannelId>> {
 572        let client = self.client.clone();
 573        let name = name.trim_start_matches('#').to_owned();
 574        cx.spawn(move |this, mut cx| async move {
 575            let response = client
 576                .request(proto::CreateChannel {
 577                    name,
 578                    parent_id: parent_id.map(|cid| cid.0),
 579                })
 580                .await?;
 581
 582            let channel = response
 583                .channel
 584                .ok_or_else(|| anyhow!("missing channel in response"))?;
 585            let channel_id = ChannelId(channel.id);
 586
 587            this.update(&mut cx, |this, cx| {
 588                let task = this.update_channels(
 589                    proto::UpdateChannels {
 590                        channels: vec![channel],
 591                        ..Default::default()
 592                    },
 593                    cx,
 594                );
 595                assert!(task.is_none());
 596
 597                // This event is emitted because the collab panel wants to clear the pending edit state
 598                // before this frame is rendered. But we can't guarantee that the collab panel's future
 599                // will resolve before this flush_effects finishes. Synchronously emitting this event
 600                // ensures that the collab panel will observe this creation before the frame completes
 601                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 602            })?;
 603
 604            Ok(channel_id)
 605        })
 606    }
 607
 608    pub fn move_channel(
 609        &mut self,
 610        channel_id: ChannelId,
 611        to: ChannelId,
 612        cx: &mut Context<Self>,
 613    ) -> Task<Result<()>> {
 614        let client = self.client.clone();
 615        cx.spawn(move |_, _| async move {
 616            let _ = client
 617                .request(proto::MoveChannel {
 618                    channel_id: channel_id.0,
 619                    to: to.0,
 620                })
 621                .await?;
 622
 623            Ok(())
 624        })
 625    }
 626
 627    pub fn set_channel_visibility(
 628        &mut self,
 629        channel_id: ChannelId,
 630        visibility: ChannelVisibility,
 631        cx: &mut Context<Self>,
 632    ) -> Task<Result<()>> {
 633        let client = self.client.clone();
 634        cx.spawn(move |_, _| async move {
 635            let _ = client
 636                .request(proto::SetChannelVisibility {
 637                    channel_id: channel_id.0,
 638                    visibility: visibility.into(),
 639                })
 640                .await?;
 641
 642            Ok(())
 643        })
 644    }
 645
 646    pub fn invite_member(
 647        &mut self,
 648        channel_id: ChannelId,
 649        user_id: UserId,
 650        role: proto::ChannelRole,
 651        cx: &mut Context<Self>,
 652    ) -> Task<Result<()>> {
 653        if !self.outgoing_invites.insert((channel_id, user_id)) {
 654            return Task::ready(Err(anyhow!("invite request already in progress")));
 655        }
 656
 657        cx.notify();
 658        let client = self.client.clone();
 659        cx.spawn(move |this, mut cx| async move {
 660            let result = client
 661                .request(proto::InviteChannelMember {
 662                    channel_id: channel_id.0,
 663                    user_id,
 664                    role: role.into(),
 665                })
 666                .await;
 667
 668            this.update(&mut cx, |this, cx| {
 669                this.outgoing_invites.remove(&(channel_id, user_id));
 670                cx.notify();
 671            })?;
 672
 673            result?;
 674
 675            Ok(())
 676        })
 677    }
 678
 679    pub fn remove_member(
 680        &mut self,
 681        channel_id: ChannelId,
 682        user_id: u64,
 683        cx: &mut Context<Self>,
 684    ) -> Task<Result<()>> {
 685        if !self.outgoing_invites.insert((channel_id, user_id)) {
 686            return Task::ready(Err(anyhow!("invite request already in progress")));
 687        }
 688
 689        cx.notify();
 690        let client = self.client.clone();
 691        cx.spawn(move |this, mut cx| async move {
 692            let result = client
 693                .request(proto::RemoveChannelMember {
 694                    channel_id: channel_id.0,
 695                    user_id,
 696                })
 697                .await;
 698
 699            this.update(&mut cx, |this, cx| {
 700                this.outgoing_invites.remove(&(channel_id, user_id));
 701                cx.notify();
 702            })?;
 703            result?;
 704            Ok(())
 705        })
 706    }
 707
 708    pub fn set_member_role(
 709        &mut self,
 710        channel_id: ChannelId,
 711        user_id: UserId,
 712        role: proto::ChannelRole,
 713        cx: &mut Context<Self>,
 714    ) -> Task<Result<()>> {
 715        if !self.outgoing_invites.insert((channel_id, user_id)) {
 716            return Task::ready(Err(anyhow!("member request already in progress")));
 717        }
 718
 719        cx.notify();
 720        let client = self.client.clone();
 721        cx.spawn(move |this, mut cx| async move {
 722            let result = client
 723                .request(proto::SetChannelMemberRole {
 724                    channel_id: channel_id.0,
 725                    user_id,
 726                    role: role.into(),
 727                })
 728                .await;
 729
 730            this.update(&mut cx, |this, cx| {
 731                this.outgoing_invites.remove(&(channel_id, user_id));
 732                cx.notify();
 733            })?;
 734
 735            result?;
 736            Ok(())
 737        })
 738    }
 739
 740    pub fn rename(
 741        &mut self,
 742        channel_id: ChannelId,
 743        new_name: &str,
 744        cx: &mut Context<Self>,
 745    ) -> Task<Result<()>> {
 746        let client = self.client.clone();
 747        let name = new_name.to_string();
 748        cx.spawn(move |this, mut cx| async move {
 749            let channel = client
 750                .request(proto::RenameChannel {
 751                    channel_id: channel_id.0,
 752                    name,
 753                })
 754                .await?
 755                .channel
 756                .ok_or_else(|| anyhow!("missing channel in response"))?;
 757            this.update(&mut cx, |this, cx| {
 758                let task = this.update_channels(
 759                    proto::UpdateChannels {
 760                        channels: vec![channel],
 761                        ..Default::default()
 762                    },
 763                    cx,
 764                );
 765                assert!(task.is_none());
 766
 767                // This event is emitted because the collab panel wants to clear the pending edit state
 768                // before this frame is rendered. But we can't guarantee that the collab panel's future
 769                // will resolve before this flush_effects finishes. Synchronously emitting this event
 770                // ensures that the collab panel will observe this creation before the frame complete
 771                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 772            })?;
 773            Ok(())
 774        })
 775    }
 776
 777    pub fn respond_to_channel_invite(
 778        &mut self,
 779        channel_id: ChannelId,
 780        accept: bool,
 781        cx: &mut Context<Self>,
 782    ) -> Task<Result<()>> {
 783        let client = self.client.clone();
 784        cx.background_executor().spawn(async move {
 785            client
 786                .request(proto::RespondToChannelInvite {
 787                    channel_id: channel_id.0,
 788                    accept,
 789                })
 790                .await?;
 791            Ok(())
 792        })
 793    }
 794    pub fn fuzzy_search_members(
 795        &self,
 796        channel_id: ChannelId,
 797        query: String,
 798        limit: u16,
 799        cx: &mut Context<Self>,
 800    ) -> Task<Result<Vec<ChannelMembership>>> {
 801        let client = self.client.clone();
 802        let user_store = self.user_store.downgrade();
 803        cx.spawn(move |_, mut cx| async move {
 804            let response = client
 805                .request(proto::GetChannelMembers {
 806                    channel_id: channel_id.0,
 807                    query,
 808                    limit: limit as u64,
 809                })
 810                .await?;
 811            user_store.update(&mut cx, |user_store, _| {
 812                user_store.insert(response.users);
 813                response
 814                    .members
 815                    .into_iter()
 816                    .filter_map(|member| {
 817                        Some(ChannelMembership {
 818                            user: user_store.get_cached_user(member.user_id)?,
 819                            role: member.role(),
 820                            kind: member.kind(),
 821                        })
 822                    })
 823                    .collect()
 824            })
 825        })
 826    }
 827
 828    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 829        let client = self.client.clone();
 830        async move {
 831            client
 832                .request(proto::DeleteChannel {
 833                    channel_id: channel_id.0,
 834                })
 835                .await?;
 836            Ok(())
 837        }
 838    }
 839
 840    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 841        false
 842    }
 843
 844    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 845        self.outgoing_invites.contains(&(channel_id, user_id))
 846    }
 847
 848    async fn handle_update_channels(
 849        this: Entity<Self>,
 850        message: TypedEnvelope<proto::UpdateChannels>,
 851        mut cx: AsyncApp,
 852    ) -> Result<()> {
 853        this.update(&mut cx, |this, _| {
 854            this.update_channels_tx
 855                .unbounded_send(message.payload)
 856                .unwrap();
 857        })?;
 858        Ok(())
 859    }
 860
 861    async fn handle_update_user_channels(
 862        this: Entity<Self>,
 863        message: TypedEnvelope<proto::UpdateUserChannels>,
 864        mut cx: AsyncApp,
 865    ) -> Result<()> {
 866        this.update(&mut cx, |this, cx| {
 867            for buffer_version in message.payload.observed_channel_buffer_version {
 868                let version = language::proto::deserialize_version(&buffer_version.version);
 869                this.acknowledge_notes_version(
 870                    ChannelId(buffer_version.channel_id),
 871                    buffer_version.epoch,
 872                    &version,
 873                    cx,
 874                );
 875            }
 876            for message_id in message.payload.observed_channel_message_id {
 877                this.acknowledge_message_id(
 878                    ChannelId(message_id.channel_id),
 879                    message_id.message_id,
 880                    cx,
 881                );
 882            }
 883            for membership in message.payload.channel_memberships {
 884                if let Some(role) = ChannelRole::from_i32(membership.role) {
 885                    this.channel_states
 886                        .entry(ChannelId(membership.channel_id))
 887                        .or_default()
 888                        .set_role(role)
 889                }
 890            }
 891        })
 892    }
 893
 894    fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 895        self.channel_index.clear();
 896        self.channel_invitations.clear();
 897        self.channel_participants.clear();
 898        self.channel_index.clear();
 899        self.outgoing_invites.clear();
 900        self.disconnect_channel_buffers_task.take();
 901
 902        for chat in self.opened_chats.values() {
 903            if let OpenEntityHandle::Open(chat) = chat {
 904                if let Some(chat) = chat.upgrade() {
 905                    chat.update(cx, |chat, cx| {
 906                        chat.rejoin(cx);
 907                    });
 908                }
 909            }
 910        }
 911
 912        let mut buffer_versions = Vec::new();
 913        for buffer in self.opened_buffers.values() {
 914            if let OpenEntityHandle::Open(buffer) = buffer {
 915                if let Some(buffer) = buffer.upgrade() {
 916                    let channel_buffer = buffer.read(cx);
 917                    let buffer = channel_buffer.buffer().read(cx);
 918                    buffer_versions.push(proto::ChannelBufferVersion {
 919                        channel_id: channel_buffer.channel_id.0,
 920                        epoch: channel_buffer.epoch(),
 921                        version: language::proto::serialize_version(&buffer.version()),
 922                    });
 923                }
 924            }
 925        }
 926
 927        if buffer_versions.is_empty() {
 928            return Task::ready(Ok(()));
 929        }
 930
 931        let response = self.client.request(proto::RejoinChannelBuffers {
 932            buffers: buffer_versions,
 933        });
 934
 935        cx.spawn(|this, mut cx| async move {
 936            let mut response = response.await?;
 937
 938            this.update(&mut cx, |this, cx| {
 939                this.opened_buffers.retain(|_, buffer| match buffer {
 940                    OpenEntityHandle::Open(channel_buffer) => {
 941                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 942                            return false;
 943                        };
 944
 945                        channel_buffer.update(cx, |channel_buffer, cx| {
 946                            let channel_id = channel_buffer.channel_id;
 947                            if let Some(remote_buffer) = response
 948                                .buffers
 949                                .iter_mut()
 950                                .find(|buffer| buffer.channel_id == channel_id.0)
 951                            {
 952                                let channel_id = channel_buffer.channel_id;
 953                                let remote_version =
 954                                    language::proto::deserialize_version(&remote_buffer.version);
 955
 956                                channel_buffer.replace_collaborators(
 957                                    mem::take(&mut remote_buffer.collaborators),
 958                                    cx,
 959                                );
 960
 961                                let operations = channel_buffer
 962                                    .buffer()
 963                                    .update(cx, |buffer, cx| {
 964                                        let outgoing_operations =
 965                                            buffer.serialize_ops(Some(remote_version), cx);
 966                                        let incoming_operations =
 967                                            mem::take(&mut remote_buffer.operations)
 968                                                .into_iter()
 969                                                .map(language::proto::deserialize_operation)
 970                                                .collect::<Result<Vec<_>>>()?;
 971                                        buffer.apply_ops(incoming_operations, cx);
 972                                        anyhow::Ok(outgoing_operations)
 973                                    })
 974                                    .log_err();
 975
 976                                if let Some(operations) = operations {
 977                                    let client = this.client.clone();
 978                                    cx.background_executor()
 979                                        .spawn(async move {
 980                                            let operations = operations.await;
 981                                            for chunk in
 982                                                language::proto::split_operations(operations)
 983                                            {
 984                                                client
 985                                                    .send(proto::UpdateChannelBuffer {
 986                                                        channel_id: channel_id.0,
 987                                                        operations: chunk,
 988                                                    })
 989                                                    .ok();
 990                                            }
 991                                        })
 992                                        .detach();
 993                                    return true;
 994                                }
 995                            }
 996
 997                            channel_buffer.disconnect(cx);
 998                            false
 999                        })
1000                    }
1001                    OpenEntityHandle::Loading(_) => true,
1002                });
1003            })
1004            .ok();
1005            anyhow::Ok(())
1006        })
1007    }
1008
1009    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1010        cx.notify();
1011        self.did_subscribe = false;
1012        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1013            cx.spawn(move |this, mut cx| async move {
1014                if wait_for_reconnect {
1015                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1016                }
1017
1018                if let Some(this) = this.upgrade() {
1019                    this.update(&mut cx, |this, cx| {
1020                        for (_, buffer) in this.opened_buffers.drain() {
1021                            if let OpenEntityHandle::Open(buffer) = buffer {
1022                                if let Some(buffer) = buffer.upgrade() {
1023                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1024                                }
1025                            }
1026                        }
1027                    })
1028                    .ok();
1029                }
1030            })
1031        });
1032    }
1033
1034    pub(crate) fn update_channels(
1035        &mut self,
1036        payload: proto::UpdateChannels,
1037        cx: &mut Context<ChannelStore>,
1038    ) -> Option<Task<Result<()>>> {
1039        if !payload.remove_channel_invitations.is_empty() {
1040            self.channel_invitations
1041                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1042        }
1043        for channel in payload.channel_invitations {
1044            match self
1045                .channel_invitations
1046                .binary_search_by_key(&channel.id, |c| c.id.0)
1047            {
1048                Ok(ix) => {
1049                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1050                }
1051                Err(ix) => self.channel_invitations.insert(
1052                    ix,
1053                    Arc::new(Channel {
1054                        id: ChannelId(channel.id),
1055                        visibility: channel.visibility(),
1056                        name: channel.name.into(),
1057                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1058                    }),
1059                ),
1060            }
1061        }
1062
1063        let channels_changed = !payload.channels.is_empty()
1064            || !payload.delete_channels.is_empty()
1065            || !payload.latest_channel_message_ids.is_empty()
1066            || !payload.latest_channel_buffer_versions.is_empty();
1067
1068        if channels_changed {
1069            if !payload.delete_channels.is_empty() {
1070                let delete_channels: Vec<ChannelId> =
1071                    payload.delete_channels.into_iter().map(ChannelId).collect();
1072                self.channel_index.delete_channels(&delete_channels);
1073                self.channel_participants
1074                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1075
1076                for channel_id in &delete_channels {
1077                    let channel_id = *channel_id;
1078                    if payload
1079                        .channels
1080                        .iter()
1081                        .any(|channel| channel.id == channel_id.0)
1082                    {
1083                        continue;
1084                    }
1085                    if let Some(OpenEntityHandle::Open(buffer)) =
1086                        self.opened_buffers.remove(&channel_id)
1087                    {
1088                        if let Some(buffer) = buffer.upgrade() {
1089                            buffer.update(cx, ChannelBuffer::disconnect);
1090                        }
1091                    }
1092                }
1093            }
1094
1095            let mut index = self.channel_index.bulk_insert();
1096            for channel in payload.channels {
1097                let id = ChannelId(channel.id);
1098                let channel_changed = index.insert(channel);
1099
1100                if channel_changed {
1101                    if let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1102                        if let Some(buffer) = buffer.upgrade() {
1103                            buffer.update(cx, ChannelBuffer::channel_changed);
1104                        }
1105                    }
1106                }
1107            }
1108
1109            for latest_buffer_version in payload.latest_channel_buffer_versions {
1110                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1111                self.channel_states
1112                    .entry(ChannelId(latest_buffer_version.channel_id))
1113                    .or_default()
1114                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1115            }
1116
1117            for latest_channel_message in payload.latest_channel_message_ids {
1118                self.channel_states
1119                    .entry(ChannelId(latest_channel_message.channel_id))
1120                    .or_default()
1121                    .update_latest_message_id(latest_channel_message.message_id);
1122            }
1123        }
1124
1125        cx.notify();
1126        if payload.channel_participants.is_empty() {
1127            return None;
1128        }
1129
1130        let mut all_user_ids = Vec::new();
1131        let channel_participants = payload.channel_participants;
1132        for entry in &channel_participants {
1133            for user_id in entry.participant_user_ids.iter() {
1134                if let Err(ix) = all_user_ids.binary_search(user_id) {
1135                    all_user_ids.insert(ix, *user_id);
1136                }
1137            }
1138        }
1139
1140        let users = self
1141            .user_store
1142            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1143        Some(cx.spawn(|this, mut cx| async move {
1144            let users = users.await?;
1145
1146            this.update(&mut cx, |this, cx| {
1147                for entry in &channel_participants {
1148                    let mut participants: Vec<_> = entry
1149                        .participant_user_ids
1150                        .iter()
1151                        .filter_map(|user_id| {
1152                            users
1153                                .binary_search_by_key(&user_id, |user| &user.id)
1154                                .ok()
1155                                .map(|ix| users[ix].clone())
1156                        })
1157                        .collect();
1158
1159                    participants.sort_by_key(|u| u.id);
1160
1161                    this.channel_participants
1162                        .insert(ChannelId(entry.channel_id), participants);
1163                }
1164
1165                cx.notify();
1166            })
1167        }))
1168    }
1169}
1170
1171impl ChannelState {
1172    fn set_role(&mut self, role: ChannelRole) {
1173        self.role = Some(role);
1174    }
1175
1176    fn has_channel_buffer_changed(&self) -> bool {
1177        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1178            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1179                && self
1180                    .latest_notes_version
1181                    .version
1182                    .changed_since(&self.observed_notes_version.version))
1183    }
1184
1185    fn has_new_messages(&self) -> bool {
1186        let latest_message_id = self.latest_chat_message;
1187        let observed_message_id = self.observed_chat_message;
1188
1189        latest_message_id.is_some_and(|latest_message_id| {
1190            latest_message_id > observed_message_id.unwrap_or_default()
1191        })
1192    }
1193
1194    fn last_acknowledged_message_id(&self) -> Option<u64> {
1195        self.observed_chat_message
1196    }
1197
1198    fn acknowledge_message_id(&mut self, message_id: u64) {
1199        let observed = self.observed_chat_message.get_or_insert(message_id);
1200        *observed = (*observed).max(message_id);
1201    }
1202
1203    fn update_latest_message_id(&mut self, message_id: u64) {
1204        self.latest_chat_message =
1205            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1206    }
1207
1208    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1209        if self.observed_notes_version.epoch == epoch {
1210            self.observed_notes_version.version.join(version);
1211        } else {
1212            self.observed_notes_version = NotesVersion {
1213                epoch,
1214                version: version.clone(),
1215            };
1216        }
1217    }
1218
1219    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1220        if self.latest_notes_version.epoch == epoch {
1221            self.latest_notes_version.version.join(version);
1222        } else {
1223            self.latest_notes_version = NotesVersion {
1224                epoch,
1225                version: version.clone(),
1226            };
1227        }
1228    }
1229}