channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use rpc::{
  15    proto::{self, ChannelRole, ChannelVisibility},
  16    TypedEnvelope,
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{maybe, ResultExt};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  25    let channel_store =
  26        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36pub struct ChannelStore {
  37    pub channel_index: ChannelIndex,
  38    channel_invitations: Vec<Arc<Channel>>,
  39    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  40    channel_states: HashMap<ChannelId, ChannelState>,
  41    outgoing_invites: HashSet<(ChannelId, UserId)>,
  42    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  43    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  44    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  45    client: Arc<Client>,
  46    did_subscribe: bool,
  47    user_store: Model<UserStore>,
  48    _rpc_subscriptions: [Subscription; 2],
  49    _watch_connection_status: Task<Option<()>>,
  50    disconnect_channel_buffers_task: Option<Task<()>>,
  51    _update_channels: Task<()>,
  52}
  53
  54#[derive(Clone, Debug)]
  55pub struct Channel {
  56    pub id: ChannelId,
  57    pub name: SharedString,
  58    pub visibility: proto::ChannelVisibility,
  59    pub parent_path: Vec<ChannelId>,
  60}
  61
  62#[derive(Default, Debug)]
  63pub struct ChannelState {
  64    latest_chat_message: Option<u64>,
  65    latest_notes_version: NotesVersion,
  66    observed_notes_version: NotesVersion,
  67    observed_chat_message: Option<u64>,
  68    role: Option<ChannelRole>,
  69}
  70
  71impl Channel {
  72    pub fn link(&self, cx: &AppContext) -> String {
  73        format!(
  74            "{}/channel/{}-{}",
  75            ClientSettings::get_global(cx).server_url,
  76            Self::slug(&self.name),
  77            self.id
  78        )
  79    }
  80
  81    pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
  82        self.link(cx)
  83            + "/notes"
  84            + &heading
  85                .map(|h| format!("#{}", Self::slug(&h)))
  86                .unwrap_or_default()
  87    }
  88
  89    pub fn is_root_channel(&self) -> bool {
  90        self.parent_path.is_empty()
  91    }
  92
  93    pub fn root_id(&self) -> ChannelId {
  94        self.parent_path.first().copied().unwrap_or(self.id)
  95    }
  96
  97    pub fn slug(str: &str) -> String {
  98        let slug: String = str
  99            .chars()
 100            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 101            .collect();
 102
 103        slug.trim_matches(|c| c == '-').to_string()
 104    }
 105}
 106
 107#[derive(Debug)]
 108pub struct ChannelMembership {
 109    pub user: Arc<User>,
 110    pub kind: proto::channel_member::Kind,
 111    pub role: proto::ChannelRole,
 112}
 113impl ChannelMembership {
 114    pub fn sort_key(&self) -> MembershipSortKey {
 115        MembershipSortKey {
 116            role_order: match self.role {
 117                proto::ChannelRole::Admin => 0,
 118                proto::ChannelRole::Member => 1,
 119                proto::ChannelRole::Banned => 2,
 120                proto::ChannelRole::Talker => 3,
 121                proto::ChannelRole::Guest => 4,
 122            },
 123            kind_order: match self.kind {
 124                proto::channel_member::Kind::Member => 0,
 125                proto::channel_member::Kind::Invitee => 1,
 126            },
 127            username_order: self.user.github_login.as_str(),
 128        }
 129    }
 130}
 131
 132#[derive(PartialOrd, Ord, PartialEq, Eq)]
 133pub struct MembershipSortKey<'a> {
 134    role_order: u8,
 135    kind_order: u8,
 136    username_order: &'a str,
 137}
 138
 139pub enum ChannelEvent {
 140    ChannelCreated(ChannelId),
 141    ChannelRenamed(ChannelId),
 142}
 143
 144impl EventEmitter<ChannelEvent> for ChannelStore {}
 145
 146enum OpenedModelHandle<E> {
 147    Open(WeakModel<E>),
 148    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 149}
 150
 151struct GlobalChannelStore(Model<ChannelStore>);
 152
 153impl Global for GlobalChannelStore {}
 154
 155impl ChannelStore {
 156    pub fn global(cx: &AppContext) -> Model<Self> {
 157        cx.global::<GlobalChannelStore>().0.clone()
 158    }
 159
 160    pub fn new(
 161        client: Arc<Client>,
 162        user_store: Model<UserStore>,
 163        cx: &mut ModelContext<Self>,
 164    ) -> Self {
 165        let rpc_subscriptions = [
 166            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 167            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 168        ];
 169
 170        let mut connection_status = client.status();
 171        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 172        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 173            while let Some(status) = connection_status.next().await {
 174                let this = this.upgrade()?;
 175                match status {
 176                    client::Status::Connected { .. } => {
 177                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 178                            .ok()?
 179                            .await
 180                            .log_err()?;
 181                    }
 182                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 183                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 184                            .ok();
 185                    }
 186                    _ => {
 187                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 188                            .ok();
 189                    }
 190                }
 191            }
 192            Some(())
 193        });
 194
 195        Self {
 196            channel_invitations: Vec::default(),
 197            channel_index: ChannelIndex::default(),
 198            channel_participants: Default::default(),
 199            outgoing_invites: Default::default(),
 200            opened_buffers: Default::default(),
 201            opened_chats: Default::default(),
 202            update_channels_tx,
 203            client,
 204            user_store,
 205            _rpc_subscriptions: rpc_subscriptions,
 206            _watch_connection_status: watch_connection_status,
 207            disconnect_channel_buffers_task: None,
 208            _update_channels: cx.spawn(|this, mut cx| async move {
 209                maybe!(async move {
 210                    while let Some(update_channels) = update_channels_rx.next().await {
 211                        if let Some(this) = this.upgrade() {
 212                            let update_task = this.update(&mut cx, |this, cx| {
 213                                this.update_channels(update_channels, cx)
 214                            })?;
 215                            if let Some(update_task) = update_task {
 216                                update_task.await.log_err();
 217                            }
 218                        }
 219                    }
 220                    anyhow::Ok(())
 221                })
 222                .await
 223                .log_err();
 224            }),
 225            channel_states: Default::default(),
 226            did_subscribe: false,
 227        }
 228    }
 229
 230    pub fn initialize(&mut self) {
 231        if !self.did_subscribe
 232            && self
 233                .client
 234                .send(proto::SubscribeToChannels {})
 235                .log_err()
 236                .is_some()
 237        {
 238            self.did_subscribe = true;
 239        }
 240    }
 241
 242    pub fn client(&self) -> Arc<Client> {
 243        self.client.clone()
 244    }
 245
 246    /// Returns the number of unique channels in the store
 247    pub fn channel_count(&self) -> usize {
 248        self.channel_index.by_id().len()
 249    }
 250
 251    /// Returns the index of a channel ID in the list of unique channels
 252    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 253        self.channel_index
 254            .by_id()
 255            .keys()
 256            .position(|id| *id == channel_id)
 257    }
 258
 259    /// Returns an iterator over all unique channels
 260    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 261        self.channel_index.by_id().values()
 262    }
 263
 264    /// Iterate over all entries in the channel DAG
 265    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 266        self.channel_index
 267            .ordered_channels()
 268            .iter()
 269            .filter_map(move |id| {
 270                let channel = self.channel_index.by_id().get(id)?;
 271                Some((channel.parent_path.len(), channel))
 272            })
 273    }
 274
 275    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 276        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 277        self.channel_index.by_id().get(channel_id)
 278    }
 279
 280    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 281        self.channel_index.by_id().values().nth(ix)
 282    }
 283
 284    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 285        self.channel_invitations
 286            .iter()
 287            .any(|channel| channel.id == channel_id)
 288    }
 289
 290    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 291        &self.channel_invitations
 292    }
 293
 294    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 295        self.channel_index.by_id().get(&channel_id)
 296    }
 297
 298    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 299        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 300            if let OpenedModelHandle::Open(buffer) = buffer {
 301                return buffer.upgrade().is_some();
 302            }
 303        }
 304        false
 305    }
 306
 307    pub fn open_channel_buffer(
 308        &mut self,
 309        channel_id: ChannelId,
 310        cx: &mut ModelContext<Self>,
 311    ) -> Task<Result<Model<ChannelBuffer>>> {
 312        let client = self.client.clone();
 313        let user_store = self.user_store.clone();
 314        let channel_store = cx.handle();
 315        self.open_channel_resource(
 316            channel_id,
 317            |this| &mut this.opened_buffers,
 318            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 319            cx,
 320        )
 321    }
 322
 323    pub fn fetch_channel_messages(
 324        &self,
 325        message_ids: Vec<u64>,
 326        cx: &mut ModelContext<Self>,
 327    ) -> Task<Result<Vec<ChannelMessage>>> {
 328        let request = if message_ids.is_empty() {
 329            None
 330        } else {
 331            Some(
 332                self.client
 333                    .request(proto::GetChannelMessagesById { message_ids }),
 334            )
 335        };
 336        cx.spawn(|this, mut cx| async move {
 337            if let Some(request) = request {
 338                let response = request.await?;
 339                let this = this
 340                    .upgrade()
 341                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 342                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 343                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 344            } else {
 345                Ok(Vec::new())
 346            }
 347        })
 348    }
 349
 350    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 351        self.channel_states
 352            .get(&channel_id)
 353            .is_some_and(|state| state.has_channel_buffer_changed())
 354    }
 355
 356    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 357        self.channel_states
 358            .get(&channel_id)
 359            .is_some_and(|state| state.has_new_messages())
 360    }
 361
 362    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 363        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 364            state.latest_chat_message = message_id;
 365        }
 366    }
 367
 368    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 369        self.channel_states.get(&channel_id).and_then(|state| {
 370            if let Some(last_message_id) = state.latest_chat_message {
 371                if state
 372                    .last_acknowledged_message_id()
 373                    .is_some_and(|id| id < last_message_id)
 374                {
 375                    return state.last_acknowledged_message_id();
 376                }
 377            }
 378
 379            None
 380        })
 381    }
 382
 383    pub fn acknowledge_message_id(
 384        &mut self,
 385        channel_id: ChannelId,
 386        message_id: u64,
 387        cx: &mut ModelContext<Self>,
 388    ) {
 389        self.channel_states
 390            .entry(channel_id)
 391            .or_default()
 392            .acknowledge_message_id(message_id);
 393        cx.notify();
 394    }
 395
 396    pub fn update_latest_message_id(
 397        &mut self,
 398        channel_id: ChannelId,
 399        message_id: u64,
 400        cx: &mut ModelContext<Self>,
 401    ) {
 402        self.channel_states
 403            .entry(channel_id)
 404            .or_default()
 405            .update_latest_message_id(message_id);
 406        cx.notify();
 407    }
 408
 409    pub fn acknowledge_notes_version(
 410        &mut self,
 411        channel_id: ChannelId,
 412        epoch: u64,
 413        version: &clock::Global,
 414        cx: &mut ModelContext<Self>,
 415    ) {
 416        self.channel_states
 417            .entry(channel_id)
 418            .or_default()
 419            .acknowledge_notes_version(epoch, version);
 420        cx.notify()
 421    }
 422
 423    pub fn update_latest_notes_version(
 424        &mut self,
 425        channel_id: ChannelId,
 426        epoch: u64,
 427        version: &clock::Global,
 428        cx: &mut ModelContext<Self>,
 429    ) {
 430        self.channel_states
 431            .entry(channel_id)
 432            .or_default()
 433            .update_latest_notes_version(epoch, version);
 434        cx.notify()
 435    }
 436
 437    pub fn open_channel_chat(
 438        &mut self,
 439        channel_id: ChannelId,
 440        cx: &mut ModelContext<Self>,
 441    ) -> Task<Result<Model<ChannelChat>>> {
 442        let client = self.client.clone();
 443        let user_store = self.user_store.clone();
 444        let this = cx.handle();
 445        self.open_channel_resource(
 446            channel_id,
 447            |this| &mut this.opened_chats,
 448            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 449            cx,
 450        )
 451    }
 452
 453    /// Asynchronously open a given resource associated with a channel.
 454    ///
 455    /// Make sure that the resource is only opened once, even if this method
 456    /// is called multiple times with the same channel id while the first task
 457    /// is still running.
 458    fn open_channel_resource<T, F, Fut>(
 459        &mut self,
 460        channel_id: ChannelId,
 461        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 462        load: F,
 463        cx: &mut ModelContext<Self>,
 464    ) -> Task<Result<Model<T>>>
 465    where
 466        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 467        Fut: Future<Output = Result<Model<T>>>,
 468        T: 'static,
 469    {
 470        let task = loop {
 471            match get_map(self).entry(channel_id) {
 472                hash_map::Entry::Occupied(e) => match e.get() {
 473                    OpenedModelHandle::Open(model) => {
 474                        if let Some(model) = model.upgrade() {
 475                            break Task::ready(Ok(model)).shared();
 476                        } else {
 477                            get_map(self).remove(&channel_id);
 478                            continue;
 479                        }
 480                    }
 481                    OpenedModelHandle::Loading(task) => {
 482                        break task.clone();
 483                    }
 484                },
 485                hash_map::Entry::Vacant(e) => {
 486                    let task = cx
 487                        .spawn(move |this, mut cx| async move {
 488                            let channel = this.update(&mut cx, |this, _| {
 489                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 490                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 491                                })
 492                            })??;
 493
 494                            load(channel, cx).await.map_err(Arc::new)
 495                        })
 496                        .shared();
 497
 498                    e.insert(OpenedModelHandle::Loading(task.clone()));
 499                    cx.spawn({
 500                        let task = task.clone();
 501                        move |this, mut cx| async move {
 502                            let result = task.await;
 503                            this.update(&mut cx, |this, _| match result {
 504                                Ok(model) => {
 505                                    get_map(this).insert(
 506                                        channel_id,
 507                                        OpenedModelHandle::Open(model.downgrade()),
 508                                    );
 509                                }
 510                                Err(_) => {
 511                                    get_map(this).remove(&channel_id);
 512                                }
 513                            })
 514                            .ok();
 515                        }
 516                    })
 517                    .detach();
 518                    break task;
 519                }
 520            }
 521        };
 522        cx.background_executor()
 523            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 524    }
 525
 526    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 527        self.channel_role(channel_id) == proto::ChannelRole::Admin
 528    }
 529
 530    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 531        self.channel_index
 532            .by_id()
 533            .get(&channel_id)
 534            .map_or(false, |channel| channel.is_root_channel())
 535    }
 536
 537    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 538        self.channel_index
 539            .by_id()
 540            .get(&channel_id)
 541            .map_or(false, |channel| {
 542                channel.visibility == ChannelVisibility::Public
 543            })
 544    }
 545
 546    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 547        match self.channel_role(channel_id) {
 548            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 549            _ => Capability::ReadOnly,
 550        }
 551    }
 552
 553    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 554        maybe!({
 555            let mut channel = self.channel_for_id(channel_id)?;
 556            if !channel.is_root_channel() {
 557                channel = self.channel_for_id(channel.root_id())?;
 558            }
 559            let root_channel_state = self.channel_states.get(&channel.id);
 560            root_channel_state?.role
 561        })
 562        .unwrap_or(proto::ChannelRole::Guest)
 563    }
 564
 565    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 566        self.channel_participants
 567            .get(&channel_id)
 568            .map_or(&[], |v| v.as_slice())
 569    }
 570
 571    pub fn create_channel(
 572        &self,
 573        name: &str,
 574        parent_id: Option<ChannelId>,
 575        cx: &mut ModelContext<Self>,
 576    ) -> Task<Result<ChannelId>> {
 577        let client = self.client.clone();
 578        let name = name.trim_start_matches('#').to_owned();
 579        cx.spawn(move |this, mut cx| async move {
 580            let response = client
 581                .request(proto::CreateChannel {
 582                    name,
 583                    parent_id: parent_id.map(|cid| cid.0),
 584                })
 585                .await?;
 586
 587            let channel = response
 588                .channel
 589                .ok_or_else(|| anyhow!("missing channel in response"))?;
 590            let channel_id = ChannelId(channel.id);
 591
 592            this.update(&mut cx, |this, cx| {
 593                let task = this.update_channels(
 594                    proto::UpdateChannels {
 595                        channels: vec![channel],
 596                        ..Default::default()
 597                    },
 598                    cx,
 599                );
 600                assert!(task.is_none());
 601
 602                // This event is emitted because the collab panel wants to clear the pending edit state
 603                // before this frame is rendered. But we can't guarantee that the collab panel's future
 604                // will resolve before this flush_effects finishes. Synchronously emitting this event
 605                // ensures that the collab panel will observe this creation before the frame completes
 606                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 607            })?;
 608
 609            Ok(channel_id)
 610        })
 611    }
 612
 613    pub fn move_channel(
 614        &mut self,
 615        channel_id: ChannelId,
 616        to: ChannelId,
 617        cx: &mut ModelContext<Self>,
 618    ) -> Task<Result<()>> {
 619        let client = self.client.clone();
 620        cx.spawn(move |_, _| async move {
 621            let _ = client
 622                .request(proto::MoveChannel {
 623                    channel_id: channel_id.0,
 624                    to: to.0,
 625                })
 626                .await?;
 627
 628            Ok(())
 629        })
 630    }
 631
 632    pub fn set_channel_visibility(
 633        &mut self,
 634        channel_id: ChannelId,
 635        visibility: ChannelVisibility,
 636        cx: &mut ModelContext<Self>,
 637    ) -> Task<Result<()>> {
 638        let client = self.client.clone();
 639        cx.spawn(move |_, _| async move {
 640            let _ = client
 641                .request(proto::SetChannelVisibility {
 642                    channel_id: channel_id.0,
 643                    visibility: visibility.into(),
 644                })
 645                .await?;
 646
 647            Ok(())
 648        })
 649    }
 650
 651    pub fn invite_member(
 652        &mut self,
 653        channel_id: ChannelId,
 654        user_id: UserId,
 655        role: proto::ChannelRole,
 656        cx: &mut ModelContext<Self>,
 657    ) -> Task<Result<()>> {
 658        if !self.outgoing_invites.insert((channel_id, user_id)) {
 659            return Task::ready(Err(anyhow!("invite request already in progress")));
 660        }
 661
 662        cx.notify();
 663        let client = self.client.clone();
 664        cx.spawn(move |this, mut cx| async move {
 665            let result = client
 666                .request(proto::InviteChannelMember {
 667                    channel_id: channel_id.0,
 668                    user_id,
 669                    role: role.into(),
 670                })
 671                .await;
 672
 673            this.update(&mut cx, |this, cx| {
 674                this.outgoing_invites.remove(&(channel_id, user_id));
 675                cx.notify();
 676            })?;
 677
 678            result?;
 679
 680            Ok(())
 681        })
 682    }
 683
 684    pub fn remove_member(
 685        &mut self,
 686        channel_id: ChannelId,
 687        user_id: u64,
 688        cx: &mut ModelContext<Self>,
 689    ) -> Task<Result<()>> {
 690        if !self.outgoing_invites.insert((channel_id, user_id)) {
 691            return Task::ready(Err(anyhow!("invite request already in progress")));
 692        }
 693
 694        cx.notify();
 695        let client = self.client.clone();
 696        cx.spawn(move |this, mut cx| async move {
 697            let result = client
 698                .request(proto::RemoveChannelMember {
 699                    channel_id: channel_id.0,
 700                    user_id,
 701                })
 702                .await;
 703
 704            this.update(&mut cx, |this, cx| {
 705                this.outgoing_invites.remove(&(channel_id, user_id));
 706                cx.notify();
 707            })?;
 708            result?;
 709            Ok(())
 710        })
 711    }
 712
 713    pub fn set_member_role(
 714        &mut self,
 715        channel_id: ChannelId,
 716        user_id: UserId,
 717        role: proto::ChannelRole,
 718        cx: &mut ModelContext<Self>,
 719    ) -> Task<Result<()>> {
 720        if !self.outgoing_invites.insert((channel_id, user_id)) {
 721            return Task::ready(Err(anyhow!("member request already in progress")));
 722        }
 723
 724        cx.notify();
 725        let client = self.client.clone();
 726        cx.spawn(move |this, mut cx| async move {
 727            let result = client
 728                .request(proto::SetChannelMemberRole {
 729                    channel_id: channel_id.0,
 730                    user_id,
 731                    role: role.into(),
 732                })
 733                .await;
 734
 735            this.update(&mut cx, |this, cx| {
 736                this.outgoing_invites.remove(&(channel_id, user_id));
 737                cx.notify();
 738            })?;
 739
 740            result?;
 741            Ok(())
 742        })
 743    }
 744
 745    pub fn rename(
 746        &mut self,
 747        channel_id: ChannelId,
 748        new_name: &str,
 749        cx: &mut ModelContext<Self>,
 750    ) -> Task<Result<()>> {
 751        let client = self.client.clone();
 752        let name = new_name.to_string();
 753        cx.spawn(move |this, mut cx| async move {
 754            let channel = client
 755                .request(proto::RenameChannel {
 756                    channel_id: channel_id.0,
 757                    name,
 758                })
 759                .await?
 760                .channel
 761                .ok_or_else(|| anyhow!("missing channel in response"))?;
 762            this.update(&mut cx, |this, cx| {
 763                let task = this.update_channels(
 764                    proto::UpdateChannels {
 765                        channels: vec![channel],
 766                        ..Default::default()
 767                    },
 768                    cx,
 769                );
 770                assert!(task.is_none());
 771
 772                // This event is emitted because the collab panel wants to clear the pending edit state
 773                // before this frame is rendered. But we can't guarantee that the collab panel's future
 774                // will resolve before this flush_effects finishes. Synchronously emitting this event
 775                // ensures that the collab panel will observe this creation before the frame complete
 776                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 777            })?;
 778            Ok(())
 779        })
 780    }
 781
 782    pub fn respond_to_channel_invite(
 783        &mut self,
 784        channel_id: ChannelId,
 785        accept: bool,
 786        cx: &mut ModelContext<Self>,
 787    ) -> Task<Result<()>> {
 788        let client = self.client.clone();
 789        cx.background_executor().spawn(async move {
 790            client
 791                .request(proto::RespondToChannelInvite {
 792                    channel_id: channel_id.0,
 793                    accept,
 794                })
 795                .await?;
 796            Ok(())
 797        })
 798    }
 799    pub fn fuzzy_search_members(
 800        &self,
 801        channel_id: ChannelId,
 802        query: String,
 803        limit: u16,
 804        cx: &mut ModelContext<Self>,
 805    ) -> Task<Result<Vec<ChannelMembership>>> {
 806        let client = self.client.clone();
 807        let user_store = self.user_store.downgrade();
 808        cx.spawn(move |_, mut cx| async move {
 809            let response = client
 810                .request(proto::GetChannelMembers {
 811                    channel_id: channel_id.0,
 812                    query,
 813                    limit: limit as u64,
 814                })
 815                .await?;
 816            user_store.update(&mut cx, |user_store, _| {
 817                user_store.insert(response.users);
 818                response
 819                    .members
 820                    .into_iter()
 821                    .filter_map(|member| {
 822                        Some(ChannelMembership {
 823                            user: user_store.get_cached_user(member.user_id)?,
 824                            role: member.role(),
 825                            kind: member.kind(),
 826                        })
 827                    })
 828                    .collect()
 829            })
 830        })
 831    }
 832
 833    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 834        let client = self.client.clone();
 835        async move {
 836            client
 837                .request(proto::DeleteChannel {
 838                    channel_id: channel_id.0,
 839                })
 840                .await?;
 841            Ok(())
 842        }
 843    }
 844
 845    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 846        false
 847    }
 848
 849    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 850        self.outgoing_invites.contains(&(channel_id, user_id))
 851    }
 852
 853    async fn handle_update_channels(
 854        this: Model<Self>,
 855        message: TypedEnvelope<proto::UpdateChannels>,
 856        mut cx: AsyncAppContext,
 857    ) -> Result<()> {
 858        this.update(&mut cx, |this, _| {
 859            this.update_channels_tx
 860                .unbounded_send(message.payload)
 861                .unwrap();
 862        })?;
 863        Ok(())
 864    }
 865
 866    async fn handle_update_user_channels(
 867        this: Model<Self>,
 868        message: TypedEnvelope<proto::UpdateUserChannels>,
 869        mut cx: AsyncAppContext,
 870    ) -> Result<()> {
 871        this.update(&mut cx, |this, cx| {
 872            for buffer_version in message.payload.observed_channel_buffer_version {
 873                let version = language::proto::deserialize_version(&buffer_version.version);
 874                this.acknowledge_notes_version(
 875                    ChannelId(buffer_version.channel_id),
 876                    buffer_version.epoch,
 877                    &version,
 878                    cx,
 879                );
 880            }
 881            for message_id in message.payload.observed_channel_message_id {
 882                this.acknowledge_message_id(
 883                    ChannelId(message_id.channel_id),
 884                    message_id.message_id,
 885                    cx,
 886                );
 887            }
 888            for membership in message.payload.channel_memberships {
 889                if let Some(role) = ChannelRole::from_i32(membership.role) {
 890                    this.channel_states
 891                        .entry(ChannelId(membership.channel_id))
 892                        .or_default()
 893                        .set_role(role)
 894                }
 895            }
 896        })
 897    }
 898
 899    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 900        self.channel_index.clear();
 901        self.channel_invitations.clear();
 902        self.channel_participants.clear();
 903        self.channel_index.clear();
 904        self.outgoing_invites.clear();
 905        self.disconnect_channel_buffers_task.take();
 906
 907        for chat in self.opened_chats.values() {
 908            if let OpenedModelHandle::Open(chat) = chat {
 909                if let Some(chat) = chat.upgrade() {
 910                    chat.update(cx, |chat, cx| {
 911                        chat.rejoin(cx);
 912                    });
 913                }
 914            }
 915        }
 916
 917        let mut buffer_versions = Vec::new();
 918        for buffer in self.opened_buffers.values() {
 919            if let OpenedModelHandle::Open(buffer) = buffer {
 920                if let Some(buffer) = buffer.upgrade() {
 921                    let channel_buffer = buffer.read(cx);
 922                    let buffer = channel_buffer.buffer().read(cx);
 923                    buffer_versions.push(proto::ChannelBufferVersion {
 924                        channel_id: channel_buffer.channel_id.0,
 925                        epoch: channel_buffer.epoch(),
 926                        version: language::proto::serialize_version(&buffer.version()),
 927                    });
 928                }
 929            }
 930        }
 931
 932        if buffer_versions.is_empty() {
 933            return Task::ready(Ok(()));
 934        }
 935
 936        let response = self.client.request(proto::RejoinChannelBuffers {
 937            buffers: buffer_versions,
 938        });
 939
 940        cx.spawn(|this, mut cx| async move {
 941            let mut response = response.await?;
 942
 943            this.update(&mut cx, |this, cx| {
 944                this.opened_buffers.retain(|_, buffer| match buffer {
 945                    OpenedModelHandle::Open(channel_buffer) => {
 946                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 947                            return false;
 948                        };
 949
 950                        channel_buffer.update(cx, |channel_buffer, cx| {
 951                            let channel_id = channel_buffer.channel_id;
 952                            if let Some(remote_buffer) = response
 953                                .buffers
 954                                .iter_mut()
 955                                .find(|buffer| buffer.channel_id == channel_id.0)
 956                            {
 957                                let channel_id = channel_buffer.channel_id;
 958                                let remote_version =
 959                                    language::proto::deserialize_version(&remote_buffer.version);
 960
 961                                channel_buffer.replace_collaborators(
 962                                    mem::take(&mut remote_buffer.collaborators),
 963                                    cx,
 964                                );
 965
 966                                let operations = channel_buffer
 967                                    .buffer()
 968                                    .update(cx, |buffer, cx| {
 969                                        let outgoing_operations =
 970                                            buffer.serialize_ops(Some(remote_version), cx);
 971                                        let incoming_operations =
 972                                            mem::take(&mut remote_buffer.operations)
 973                                                .into_iter()
 974                                                .map(language::proto::deserialize_operation)
 975                                                .collect::<Result<Vec<_>>>()?;
 976                                        buffer.apply_ops(incoming_operations, cx);
 977                                        anyhow::Ok(outgoing_operations)
 978                                    })
 979                                    .log_err();
 980
 981                                if let Some(operations) = operations {
 982                                    let client = this.client.clone();
 983                                    cx.background_executor()
 984                                        .spawn(async move {
 985                                            let operations = operations.await;
 986                                            for chunk in
 987                                                language::proto::split_operations(operations)
 988                                            {
 989                                                client
 990                                                    .send(proto::UpdateChannelBuffer {
 991                                                        channel_id: channel_id.0,
 992                                                        operations: chunk,
 993                                                    })
 994                                                    .ok();
 995                                            }
 996                                        })
 997                                        .detach();
 998                                    return true;
 999                                }
1000                            }
1001
1002                            channel_buffer.disconnect(cx);
1003                            false
1004                        })
1005                    }
1006                    OpenedModelHandle::Loading(_) => true,
1007                });
1008            })
1009            .ok();
1010            anyhow::Ok(())
1011        })
1012    }
1013
1014    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1015        cx.notify();
1016        self.did_subscribe = false;
1017        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1018            cx.spawn(move |this, mut cx| async move {
1019                if wait_for_reconnect {
1020                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1021                }
1022
1023                if let Some(this) = this.upgrade() {
1024                    this.update(&mut cx, |this, cx| {
1025                        for (_, buffer) in this.opened_buffers.drain() {
1026                            if let OpenedModelHandle::Open(buffer) = buffer {
1027                                if let Some(buffer) = buffer.upgrade() {
1028                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1029                                }
1030                            }
1031                        }
1032                    })
1033                    .ok();
1034                }
1035            })
1036        });
1037    }
1038
1039    pub(crate) fn update_channels(
1040        &mut self,
1041        payload: proto::UpdateChannels,
1042        cx: &mut ModelContext<ChannelStore>,
1043    ) -> Option<Task<Result<()>>> {
1044        if !payload.remove_channel_invitations.is_empty() {
1045            self.channel_invitations
1046                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1047        }
1048        for channel in payload.channel_invitations {
1049            match self
1050                .channel_invitations
1051                .binary_search_by_key(&channel.id, |c| c.id.0)
1052            {
1053                Ok(ix) => {
1054                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1055                }
1056                Err(ix) => self.channel_invitations.insert(
1057                    ix,
1058                    Arc::new(Channel {
1059                        id: ChannelId(channel.id),
1060                        visibility: channel.visibility(),
1061                        name: channel.name.into(),
1062                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1063                    }),
1064                ),
1065            }
1066        }
1067
1068        let channels_changed = !payload.channels.is_empty()
1069            || !payload.delete_channels.is_empty()
1070            || !payload.latest_channel_message_ids.is_empty()
1071            || !payload.latest_channel_buffer_versions.is_empty();
1072
1073        if channels_changed {
1074            if !payload.delete_channels.is_empty() {
1075                let delete_channels: Vec<ChannelId> =
1076                    payload.delete_channels.into_iter().map(ChannelId).collect();
1077                self.channel_index.delete_channels(&delete_channels);
1078                self.channel_participants
1079                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1080
1081                for channel_id in &delete_channels {
1082                    let channel_id = *channel_id;
1083                    if payload
1084                        .channels
1085                        .iter()
1086                        .any(|channel| channel.id == channel_id.0)
1087                    {
1088                        continue;
1089                    }
1090                    if let Some(OpenedModelHandle::Open(buffer)) =
1091                        self.opened_buffers.remove(&channel_id)
1092                    {
1093                        if let Some(buffer) = buffer.upgrade() {
1094                            buffer.update(cx, ChannelBuffer::disconnect);
1095                        }
1096                    }
1097                }
1098            }
1099
1100            let mut index = self.channel_index.bulk_insert();
1101            for channel in payload.channels {
1102                let id = ChannelId(channel.id);
1103                let channel_changed = index.insert(channel);
1104
1105                if channel_changed {
1106                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1107                        if let Some(buffer) = buffer.upgrade() {
1108                            buffer.update(cx, ChannelBuffer::channel_changed);
1109                        }
1110                    }
1111                }
1112            }
1113
1114            for latest_buffer_version in payload.latest_channel_buffer_versions {
1115                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1116                self.channel_states
1117                    .entry(ChannelId(latest_buffer_version.channel_id))
1118                    .or_default()
1119                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1120            }
1121
1122            for latest_channel_message in payload.latest_channel_message_ids {
1123                self.channel_states
1124                    .entry(ChannelId(latest_channel_message.channel_id))
1125                    .or_default()
1126                    .update_latest_message_id(latest_channel_message.message_id);
1127            }
1128        }
1129
1130        cx.notify();
1131        if payload.channel_participants.is_empty() {
1132            return None;
1133        }
1134
1135        let mut all_user_ids = Vec::new();
1136        let channel_participants = payload.channel_participants;
1137        for entry in &channel_participants {
1138            for user_id in entry.participant_user_ids.iter() {
1139                if let Err(ix) = all_user_ids.binary_search(user_id) {
1140                    all_user_ids.insert(ix, *user_id);
1141                }
1142            }
1143        }
1144
1145        let users = self
1146            .user_store
1147            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1148        Some(cx.spawn(|this, mut cx| async move {
1149            let users = users.await?;
1150
1151            this.update(&mut cx, |this, cx| {
1152                for entry in &channel_participants {
1153                    let mut participants: Vec<_> = entry
1154                        .participant_user_ids
1155                        .iter()
1156                        .filter_map(|user_id| {
1157                            users
1158                                .binary_search_by_key(&user_id, |user| &user.id)
1159                                .ok()
1160                                .map(|ix| users[ix].clone())
1161                        })
1162                        .collect();
1163
1164                    participants.sort_by_key(|u| u.id);
1165
1166                    this.channel_participants
1167                        .insert(ChannelId(entry.channel_id), participants);
1168                }
1169
1170                cx.notify();
1171            })
1172        }))
1173    }
1174}
1175
1176impl ChannelState {
1177    fn set_role(&mut self, role: ChannelRole) {
1178        self.role = Some(role);
1179    }
1180
1181    fn has_channel_buffer_changed(&self) -> bool {
1182        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1183            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1184                && self
1185                    .latest_notes_version
1186                    .version
1187                    .changed_since(&self.observed_notes_version.version))
1188    }
1189
1190    fn has_new_messages(&self) -> bool {
1191        let latest_message_id = self.latest_chat_message;
1192        let observed_message_id = self.observed_chat_message;
1193
1194        latest_message_id.is_some_and(|latest_message_id| {
1195            latest_message_id > observed_message_id.unwrap_or_default()
1196        })
1197    }
1198
1199    fn last_acknowledged_message_id(&self) -> Option<u64> {
1200        self.observed_chat_message
1201    }
1202
1203    fn acknowledge_message_id(&mut self, message_id: u64) {
1204        let observed = self.observed_chat_message.get_or_insert(message_id);
1205        *observed = (*observed).max(message_id);
1206    }
1207
1208    fn update_latest_message_id(&mut self, message_id: u64) {
1209        self.latest_chat_message =
1210            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1211    }
1212
1213    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1214        if self.observed_notes_version.epoch == epoch {
1215            self.observed_notes_version.version.join(version);
1216        } else {
1217            self.observed_notes_version = NotesVersion {
1218                epoch,
1219                version: version.clone(),
1220            };
1221        }
1222    }
1223
1224    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1225        if self.latest_notes_version.epoch == epoch {
1226            self.latest_notes_version.version.join(version);
1227        } else {
1228            self.latest_notes_version = NotesVersion {
1229                epoch,
1230                version: version.clone(),
1231            };
1232        }
1233    }
1234}