channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use rpc::{
  15    proto::{self, ChannelRole, ChannelVisibility},
  16    TypedEnvelope,
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{maybe, ResultExt};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  25    let channel_store =
  26        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36#[derive(Debug, Clone)]
  37pub struct HostedProject {
  38    project_id: ProjectId,
  39    channel_id: ChannelId,
  40    name: SharedString,
  41    _visibility: proto::ChannelVisibility,
  42}
  43impl From<proto::HostedProject> for HostedProject {
  44    fn from(project: proto::HostedProject) -> Self {
  45        Self {
  46            project_id: ProjectId(project.project_id),
  47            channel_id: ChannelId(project.channel_id),
  48            _visibility: project.visibility(),
  49            name: project.name.into(),
  50        }
  51    }
  52}
  53pub struct ChannelStore {
  54    pub channel_index: ChannelIndex,
  55    channel_invitations: Vec<Arc<Channel>>,
  56    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  57    channel_states: HashMap<ChannelId, ChannelState>,
  58    hosted_projects: HashMap<ProjectId, HostedProject>,
  59
  60    outgoing_invites: HashSet<(ChannelId, UserId)>,
  61    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  62    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  63    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  64    client: Arc<Client>,
  65    user_store: Model<UserStore>,
  66    _rpc_subscriptions: [Subscription; 2],
  67    _watch_connection_status: Task<Option<()>>,
  68    disconnect_channel_buffers_task: Option<Task<()>>,
  69    _update_channels: Task<()>,
  70}
  71
  72#[derive(Clone, Debug)]
  73pub struct Channel {
  74    pub id: ChannelId,
  75    pub name: SharedString,
  76    pub visibility: proto::ChannelVisibility,
  77    pub parent_path: Vec<ChannelId>,
  78}
  79
  80#[derive(Default, Debug)]
  81pub struct ChannelState {
  82    latest_chat_message: Option<u64>,
  83    latest_notes_version: NotesVersion,
  84    observed_notes_version: NotesVersion,
  85    observed_chat_message: Option<u64>,
  86    role: Option<ChannelRole>,
  87    projects: HashSet<ProjectId>,
  88}
  89
  90impl Channel {
  91    pub fn link(&self, cx: &AppContext) -> String {
  92        format!(
  93            "{}/channel/{}-{}",
  94            ClientSettings::get_global(cx).server_url,
  95            Self::slug(&self.name),
  96            self.id
  97        )
  98    }
  99
 100    pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
 101        self.link(cx)
 102            + "/notes"
 103            + &heading
 104                .map(|h| format!("#{}", Self::slug(&h)))
 105                .unwrap_or_default()
 106    }
 107
 108    pub fn is_root_channel(&self) -> bool {
 109        self.parent_path.is_empty()
 110    }
 111
 112    pub fn root_id(&self) -> ChannelId {
 113        self.parent_path.first().copied().unwrap_or(self.id)
 114    }
 115
 116    pub fn slug(str: &str) -> String {
 117        let slug: String = str
 118            .chars()
 119            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 120            .collect();
 121
 122        slug.trim_matches(|c| c == '-').to_string()
 123    }
 124}
 125
 126#[derive(Debug)]
 127pub struct ChannelMembership {
 128    pub user: Arc<User>,
 129    pub kind: proto::channel_member::Kind,
 130    pub role: proto::ChannelRole,
 131}
 132impl ChannelMembership {
 133    pub fn sort_key(&self) -> MembershipSortKey {
 134        MembershipSortKey {
 135            role_order: match self.role {
 136                proto::ChannelRole::Admin => 0,
 137                proto::ChannelRole::Member => 1,
 138                proto::ChannelRole::Banned => 2,
 139                proto::ChannelRole::Talker => 3,
 140                proto::ChannelRole::Guest => 4,
 141            },
 142            kind_order: match self.kind {
 143                proto::channel_member::Kind::Member => 0,
 144                proto::channel_member::Kind::Invitee => 1,
 145            },
 146            username_order: self.user.github_login.as_str(),
 147        }
 148    }
 149}
 150
 151#[derive(PartialOrd, Ord, PartialEq, Eq)]
 152pub struct MembershipSortKey<'a> {
 153    role_order: u8,
 154    kind_order: u8,
 155    username_order: &'a str,
 156}
 157
 158pub enum ChannelEvent {
 159    ChannelCreated(ChannelId),
 160    ChannelRenamed(ChannelId),
 161}
 162
 163impl EventEmitter<ChannelEvent> for ChannelStore {}
 164
 165enum OpenedModelHandle<E> {
 166    Open(WeakModel<E>),
 167    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 168}
 169
 170struct GlobalChannelStore(Model<ChannelStore>);
 171
 172impl Global for GlobalChannelStore {}
 173
 174impl ChannelStore {
 175    pub fn global(cx: &AppContext) -> Model<Self> {
 176        cx.global::<GlobalChannelStore>().0.clone()
 177    }
 178
 179    pub fn new(
 180        client: Arc<Client>,
 181        user_store: Model<UserStore>,
 182        cx: &mut ModelContext<Self>,
 183    ) -> Self {
 184        let rpc_subscriptions = [
 185            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 186            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 187        ];
 188
 189        let mut connection_status = client.status();
 190        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 191        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 192            while let Some(status) = connection_status.next().await {
 193                let this = this.upgrade()?;
 194                match status {
 195                    client::Status::Connected { .. } => {
 196                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 197                            .ok()?
 198                            .await
 199                            .log_err()?;
 200                    }
 201                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 202                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 203                            .ok();
 204                    }
 205                    _ => {
 206                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 207                            .ok();
 208                    }
 209                }
 210            }
 211            Some(())
 212        });
 213
 214        Self {
 215            channel_invitations: Vec::default(),
 216            channel_index: ChannelIndex::default(),
 217            channel_participants: Default::default(),
 218            hosted_projects: Default::default(),
 219            outgoing_invites: Default::default(),
 220            opened_buffers: Default::default(),
 221            opened_chats: Default::default(),
 222            update_channels_tx,
 223            client,
 224            user_store,
 225            _rpc_subscriptions: rpc_subscriptions,
 226            _watch_connection_status: watch_connection_status,
 227            disconnect_channel_buffers_task: None,
 228            _update_channels: cx.spawn(|this, mut cx| async move {
 229                maybe!(async move {
 230                    while let Some(update_channels) = update_channels_rx.next().await {
 231                        if let Some(this) = this.upgrade() {
 232                            let update_task = this.update(&mut cx, |this, cx| {
 233                                this.update_channels(update_channels, cx)
 234                            })?;
 235                            if let Some(update_task) = update_task {
 236                                update_task.await.log_err();
 237                            }
 238                        }
 239                    }
 240                    anyhow::Ok(())
 241                })
 242                .await
 243                .log_err();
 244            }),
 245            channel_states: Default::default(),
 246        }
 247    }
 248
 249    pub fn client(&self) -> Arc<Client> {
 250        self.client.clone()
 251    }
 252
 253    /// Returns the number of unique channels in the store
 254    pub fn channel_count(&self) -> usize {
 255        self.channel_index.by_id().len()
 256    }
 257
 258    /// Returns the index of a channel ID in the list of unique channels
 259    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 260        self.channel_index
 261            .by_id()
 262            .keys()
 263            .position(|id| *id == channel_id)
 264    }
 265
 266    /// Returns an iterator over all unique channels
 267    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 268        self.channel_index.by_id().values()
 269    }
 270
 271    /// Iterate over all entries in the channel DAG
 272    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 273        self.channel_index
 274            .ordered_channels()
 275            .iter()
 276            .filter_map(move |id| {
 277                let channel = self.channel_index.by_id().get(id)?;
 278                Some((channel.parent_path.len(), channel))
 279            })
 280    }
 281
 282    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 283        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 284        self.channel_index.by_id().get(channel_id)
 285    }
 286
 287    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 288        self.channel_index.by_id().values().nth(ix)
 289    }
 290
 291    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 292        self.channel_invitations
 293            .iter()
 294            .any(|channel| channel.id == channel_id)
 295    }
 296
 297    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 298        &self.channel_invitations
 299    }
 300
 301    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 302        self.channel_index.by_id().get(&channel_id)
 303    }
 304
 305    pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
 306        let mut projects: Vec<(SharedString, ProjectId)> = self
 307            .channel_states
 308            .get(&channel_id)
 309            .map(|state| state.projects.clone())
 310            .unwrap_or_default()
 311            .into_iter()
 312            .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
 313            .collect();
 314        projects.sort();
 315        projects
 316    }
 317
 318    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 319        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 320            if let OpenedModelHandle::Open(buffer) = buffer {
 321                return buffer.upgrade().is_some();
 322            }
 323        }
 324        false
 325    }
 326
 327    pub fn open_channel_buffer(
 328        &mut self,
 329        channel_id: ChannelId,
 330        cx: &mut ModelContext<Self>,
 331    ) -> Task<Result<Model<ChannelBuffer>>> {
 332        let client = self.client.clone();
 333        let user_store = self.user_store.clone();
 334        let channel_store = cx.handle();
 335        self.open_channel_resource(
 336            channel_id,
 337            |this| &mut this.opened_buffers,
 338            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 339            cx,
 340        )
 341    }
 342
 343    pub fn fetch_channel_messages(
 344        &self,
 345        message_ids: Vec<u64>,
 346        cx: &mut ModelContext<Self>,
 347    ) -> Task<Result<Vec<ChannelMessage>>> {
 348        let request = if message_ids.is_empty() {
 349            None
 350        } else {
 351            Some(
 352                self.client
 353                    .request(proto::GetChannelMessagesById { message_ids }),
 354            )
 355        };
 356        cx.spawn(|this, mut cx| async move {
 357            if let Some(request) = request {
 358                let response = request.await?;
 359                let this = this
 360                    .upgrade()
 361                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 362                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 363                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 364            } else {
 365                Ok(Vec::new())
 366            }
 367        })
 368    }
 369
 370    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 371        self.channel_states
 372            .get(&channel_id)
 373            .is_some_and(|state| state.has_channel_buffer_changed())
 374    }
 375
 376    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 377        self.channel_states
 378            .get(&channel_id)
 379            .is_some_and(|state| state.has_new_messages())
 380    }
 381
 382    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 383        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 384            state.latest_chat_message = message_id;
 385        }
 386    }
 387
 388    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 389        self.channel_states.get(&channel_id).and_then(|state| {
 390            if let Some(last_message_id) = state.latest_chat_message {
 391                if state
 392                    .last_acknowledged_message_id()
 393                    .is_some_and(|id| id < last_message_id)
 394                {
 395                    return state.last_acknowledged_message_id();
 396                }
 397            }
 398
 399            None
 400        })
 401    }
 402
 403    pub fn acknowledge_message_id(
 404        &mut self,
 405        channel_id: ChannelId,
 406        message_id: u64,
 407        cx: &mut ModelContext<Self>,
 408    ) {
 409        self.channel_states
 410            .entry(channel_id)
 411            .or_insert_with(|| Default::default())
 412            .acknowledge_message_id(message_id);
 413        cx.notify();
 414    }
 415
 416    pub fn update_latest_message_id(
 417        &mut self,
 418        channel_id: ChannelId,
 419        message_id: u64,
 420        cx: &mut ModelContext<Self>,
 421    ) {
 422        self.channel_states
 423            .entry(channel_id)
 424            .or_insert_with(|| Default::default())
 425            .update_latest_message_id(message_id);
 426        cx.notify();
 427    }
 428
 429    pub fn acknowledge_notes_version(
 430        &mut self,
 431        channel_id: ChannelId,
 432        epoch: u64,
 433        version: &clock::Global,
 434        cx: &mut ModelContext<Self>,
 435    ) {
 436        self.channel_states
 437            .entry(channel_id)
 438            .or_insert_with(|| Default::default())
 439            .acknowledge_notes_version(epoch, version);
 440        cx.notify()
 441    }
 442
 443    pub fn update_latest_notes_version(
 444        &mut self,
 445        channel_id: ChannelId,
 446        epoch: u64,
 447        version: &clock::Global,
 448        cx: &mut ModelContext<Self>,
 449    ) {
 450        self.channel_states
 451            .entry(channel_id)
 452            .or_insert_with(|| Default::default())
 453            .update_latest_notes_version(epoch, version);
 454        cx.notify()
 455    }
 456
 457    pub fn open_channel_chat(
 458        &mut self,
 459        channel_id: ChannelId,
 460        cx: &mut ModelContext<Self>,
 461    ) -> Task<Result<Model<ChannelChat>>> {
 462        let client = self.client.clone();
 463        let user_store = self.user_store.clone();
 464        let this = cx.handle();
 465        self.open_channel_resource(
 466            channel_id,
 467            |this| &mut this.opened_chats,
 468            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 469            cx,
 470        )
 471    }
 472
 473    /// Asynchronously open a given resource associated with a channel.
 474    ///
 475    /// Make sure that the resource is only opened once, even if this method
 476    /// is called multiple times with the same channel id while the first task
 477    /// is still running.
 478    fn open_channel_resource<T, F, Fut>(
 479        &mut self,
 480        channel_id: ChannelId,
 481        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 482        load: F,
 483        cx: &mut ModelContext<Self>,
 484    ) -> Task<Result<Model<T>>>
 485    where
 486        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 487        Fut: Future<Output = Result<Model<T>>>,
 488        T: 'static,
 489    {
 490        let task = loop {
 491            match get_map(self).entry(channel_id) {
 492                hash_map::Entry::Occupied(e) => match e.get() {
 493                    OpenedModelHandle::Open(model) => {
 494                        if let Some(model) = model.upgrade() {
 495                            break Task::ready(Ok(model)).shared();
 496                        } else {
 497                            get_map(self).remove(&channel_id);
 498                            continue;
 499                        }
 500                    }
 501                    OpenedModelHandle::Loading(task) => {
 502                        break task.clone();
 503                    }
 504                },
 505                hash_map::Entry::Vacant(e) => {
 506                    let task = cx
 507                        .spawn(move |this, mut cx| async move {
 508                            let channel = this.update(&mut cx, |this, _| {
 509                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 510                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 511                                })
 512                            })??;
 513
 514                            load(channel, cx).await.map_err(Arc::new)
 515                        })
 516                        .shared();
 517
 518                    e.insert(OpenedModelHandle::Loading(task.clone()));
 519                    cx.spawn({
 520                        let task = task.clone();
 521                        move |this, mut cx| async move {
 522                            let result = task.await;
 523                            this.update(&mut cx, |this, _| match result {
 524                                Ok(model) => {
 525                                    get_map(this).insert(
 526                                        channel_id,
 527                                        OpenedModelHandle::Open(model.downgrade()),
 528                                    );
 529                                }
 530                                Err(_) => {
 531                                    get_map(this).remove(&channel_id);
 532                                }
 533                            })
 534                            .ok();
 535                        }
 536                    })
 537                    .detach();
 538                    break task;
 539                }
 540            }
 541        };
 542        cx.background_executor()
 543            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 544    }
 545
 546    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 547        self.channel_role(channel_id) == proto::ChannelRole::Admin
 548    }
 549
 550    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 551        self.channel_index
 552            .by_id()
 553            .get(&channel_id)
 554            .map_or(false, |channel| channel.is_root_channel())
 555    }
 556
 557    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 558        self.channel_index
 559            .by_id()
 560            .get(&channel_id)
 561            .map_or(false, |channel| {
 562                channel.visibility == ChannelVisibility::Public
 563            })
 564    }
 565
 566    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 567        match self.channel_role(channel_id) {
 568            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 569            _ => Capability::ReadOnly,
 570        }
 571    }
 572
 573    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 574        maybe!({
 575            let mut channel = self.channel_for_id(channel_id)?;
 576            if !channel.is_root_channel() {
 577                channel = self.channel_for_id(channel.root_id())?;
 578            }
 579            let root_channel_state = self.channel_states.get(&channel.id);
 580            root_channel_state?.role
 581        })
 582        .unwrap_or(proto::ChannelRole::Guest)
 583    }
 584
 585    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 586        self.channel_participants
 587            .get(&channel_id)
 588            .map_or(&[], |v| v.as_slice())
 589    }
 590
 591    pub fn create_channel(
 592        &self,
 593        name: &str,
 594        parent_id: Option<ChannelId>,
 595        cx: &mut ModelContext<Self>,
 596    ) -> Task<Result<ChannelId>> {
 597        let client = self.client.clone();
 598        let name = name.trim_start_matches('#').to_owned();
 599        cx.spawn(move |this, mut cx| async move {
 600            let response = client
 601                .request(proto::CreateChannel {
 602                    name,
 603                    parent_id: parent_id.map(|cid| cid.0),
 604                })
 605                .await?;
 606
 607            let channel = response
 608                .channel
 609                .ok_or_else(|| anyhow!("missing channel in response"))?;
 610            let channel_id = ChannelId(channel.id);
 611
 612            this.update(&mut cx, |this, cx| {
 613                let task = this.update_channels(
 614                    proto::UpdateChannels {
 615                        channels: vec![channel],
 616                        ..Default::default()
 617                    },
 618                    cx,
 619                );
 620                assert!(task.is_none());
 621
 622                // This event is emitted because the collab panel wants to clear the pending edit state
 623                // before this frame is rendered. But we can't guarantee that the collab panel's future
 624                // will resolve before this flush_effects finishes. Synchronously emitting this event
 625                // ensures that the collab panel will observe this creation before the frame completes
 626                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 627            })?;
 628
 629            Ok(channel_id)
 630        })
 631    }
 632
 633    pub fn move_channel(
 634        &mut self,
 635        channel_id: ChannelId,
 636        to: ChannelId,
 637        cx: &mut ModelContext<Self>,
 638    ) -> Task<Result<()>> {
 639        let client = self.client.clone();
 640        cx.spawn(move |_, _| async move {
 641            let _ = client
 642                .request(proto::MoveChannel {
 643                    channel_id: channel_id.0,
 644                    to: to.0,
 645                })
 646                .await?;
 647
 648            Ok(())
 649        })
 650    }
 651
 652    pub fn set_channel_visibility(
 653        &mut self,
 654        channel_id: ChannelId,
 655        visibility: ChannelVisibility,
 656        cx: &mut ModelContext<Self>,
 657    ) -> Task<Result<()>> {
 658        let client = self.client.clone();
 659        cx.spawn(move |_, _| async move {
 660            let _ = client
 661                .request(proto::SetChannelVisibility {
 662                    channel_id: channel_id.0,
 663                    visibility: visibility.into(),
 664                })
 665                .await?;
 666
 667            Ok(())
 668        })
 669    }
 670
 671    pub fn invite_member(
 672        &mut self,
 673        channel_id: ChannelId,
 674        user_id: UserId,
 675        role: proto::ChannelRole,
 676        cx: &mut ModelContext<Self>,
 677    ) -> Task<Result<()>> {
 678        if !self.outgoing_invites.insert((channel_id, user_id)) {
 679            return Task::ready(Err(anyhow!("invite request already in progress")));
 680        }
 681
 682        cx.notify();
 683        let client = self.client.clone();
 684        cx.spawn(move |this, mut cx| async move {
 685            let result = client
 686                .request(proto::InviteChannelMember {
 687                    channel_id: channel_id.0,
 688                    user_id,
 689                    role: role.into(),
 690                })
 691                .await;
 692
 693            this.update(&mut cx, |this, cx| {
 694                this.outgoing_invites.remove(&(channel_id, user_id));
 695                cx.notify();
 696            })?;
 697
 698            result?;
 699
 700            Ok(())
 701        })
 702    }
 703
 704    pub fn remove_member(
 705        &mut self,
 706        channel_id: ChannelId,
 707        user_id: u64,
 708        cx: &mut ModelContext<Self>,
 709    ) -> Task<Result<()>> {
 710        if !self.outgoing_invites.insert((channel_id, user_id)) {
 711            return Task::ready(Err(anyhow!("invite request already in progress")));
 712        }
 713
 714        cx.notify();
 715        let client = self.client.clone();
 716        cx.spawn(move |this, mut cx| async move {
 717            let result = client
 718                .request(proto::RemoveChannelMember {
 719                    channel_id: channel_id.0,
 720                    user_id,
 721                })
 722                .await;
 723
 724            this.update(&mut cx, |this, cx| {
 725                this.outgoing_invites.remove(&(channel_id, user_id));
 726                cx.notify();
 727            })?;
 728            result?;
 729            Ok(())
 730        })
 731    }
 732
 733    pub fn set_member_role(
 734        &mut self,
 735        channel_id: ChannelId,
 736        user_id: UserId,
 737        role: proto::ChannelRole,
 738        cx: &mut ModelContext<Self>,
 739    ) -> Task<Result<()>> {
 740        if !self.outgoing_invites.insert((channel_id, user_id)) {
 741            return Task::ready(Err(anyhow!("member request already in progress")));
 742        }
 743
 744        cx.notify();
 745        let client = self.client.clone();
 746        cx.spawn(move |this, mut cx| async move {
 747            let result = client
 748                .request(proto::SetChannelMemberRole {
 749                    channel_id: channel_id.0,
 750                    user_id,
 751                    role: role.into(),
 752                })
 753                .await;
 754
 755            this.update(&mut cx, |this, cx| {
 756                this.outgoing_invites.remove(&(channel_id, user_id));
 757                cx.notify();
 758            })?;
 759
 760            result?;
 761            Ok(())
 762        })
 763    }
 764
 765    pub fn rename(
 766        &mut self,
 767        channel_id: ChannelId,
 768        new_name: &str,
 769        cx: &mut ModelContext<Self>,
 770    ) -> Task<Result<()>> {
 771        let client = self.client.clone();
 772        let name = new_name.to_string();
 773        cx.spawn(move |this, mut cx| async move {
 774            let channel = client
 775                .request(proto::RenameChannel {
 776                    channel_id: channel_id.0,
 777                    name,
 778                })
 779                .await?
 780                .channel
 781                .ok_or_else(|| anyhow!("missing channel in response"))?;
 782            this.update(&mut cx, |this, cx| {
 783                let task = this.update_channels(
 784                    proto::UpdateChannels {
 785                        channels: vec![channel],
 786                        ..Default::default()
 787                    },
 788                    cx,
 789                );
 790                assert!(task.is_none());
 791
 792                // This event is emitted because the collab panel wants to clear the pending edit state
 793                // before this frame is rendered. But we can't guarantee that the collab panel's future
 794                // will resolve before this flush_effects finishes. Synchronously emitting this event
 795                // ensures that the collab panel will observe this creation before the frame complete
 796                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 797            })?;
 798            Ok(())
 799        })
 800    }
 801
 802    pub fn respond_to_channel_invite(
 803        &mut self,
 804        channel_id: ChannelId,
 805        accept: bool,
 806        cx: &mut ModelContext<Self>,
 807    ) -> Task<Result<()>> {
 808        let client = self.client.clone();
 809        cx.background_executor().spawn(async move {
 810            client
 811                .request(proto::RespondToChannelInvite {
 812                    channel_id: channel_id.0,
 813                    accept,
 814                })
 815                .await?;
 816            Ok(())
 817        })
 818    }
 819    pub fn fuzzy_search_members(
 820        &self,
 821        channel_id: ChannelId,
 822        query: String,
 823        limit: u16,
 824        cx: &mut ModelContext<Self>,
 825    ) -> Task<Result<Vec<ChannelMembership>>> {
 826        let client = self.client.clone();
 827        let user_store = self.user_store.downgrade();
 828        cx.spawn(move |_, mut cx| async move {
 829            let response = client
 830                .request(proto::GetChannelMembers {
 831                    channel_id: channel_id.0,
 832                    query,
 833                    limit: limit as u64,
 834                })
 835                .await?;
 836            user_store.update(&mut cx, |user_store, _| {
 837                user_store.insert(response.users);
 838                response
 839                    .members
 840                    .into_iter()
 841                    .filter_map(|member| {
 842                        Some(ChannelMembership {
 843                            user: user_store.get_cached_user(member.user_id)?,
 844                            role: member.role(),
 845                            kind: member.kind(),
 846                        })
 847                    })
 848                    .collect()
 849            })
 850        })
 851    }
 852
 853    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 854        let client = self.client.clone();
 855        async move {
 856            client
 857                .request(proto::DeleteChannel {
 858                    channel_id: channel_id.0,
 859                })
 860                .await?;
 861            Ok(())
 862        }
 863    }
 864
 865    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 866        false
 867    }
 868
 869    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 870        self.outgoing_invites.contains(&(channel_id, user_id))
 871    }
 872
 873    async fn handle_update_channels(
 874        this: Model<Self>,
 875        message: TypedEnvelope<proto::UpdateChannels>,
 876        _: Arc<Client>,
 877        mut cx: AsyncAppContext,
 878    ) -> Result<()> {
 879        this.update(&mut cx, |this, _| {
 880            this.update_channels_tx
 881                .unbounded_send(message.payload)
 882                .unwrap();
 883        })?;
 884        Ok(())
 885    }
 886
 887    async fn handle_update_user_channels(
 888        this: Model<Self>,
 889        message: TypedEnvelope<proto::UpdateUserChannels>,
 890        _: Arc<Client>,
 891        mut cx: AsyncAppContext,
 892    ) -> Result<()> {
 893        this.update(&mut cx, |this, cx| {
 894            for buffer_version in message.payload.observed_channel_buffer_version {
 895                let version = language::proto::deserialize_version(&buffer_version.version);
 896                this.acknowledge_notes_version(
 897                    ChannelId(buffer_version.channel_id),
 898                    buffer_version.epoch,
 899                    &version,
 900                    cx,
 901                );
 902            }
 903            for message_id in message.payload.observed_channel_message_id {
 904                this.acknowledge_message_id(
 905                    ChannelId(message_id.channel_id),
 906                    message_id.message_id,
 907                    cx,
 908                );
 909            }
 910            for membership in message.payload.channel_memberships {
 911                if let Some(role) = ChannelRole::from_i32(membership.role) {
 912                    this.channel_states
 913                        .entry(ChannelId(membership.channel_id))
 914                        .or_insert_with(|| ChannelState::default())
 915                        .set_role(role)
 916                }
 917            }
 918        })
 919    }
 920
 921    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 922        self.channel_index.clear();
 923        self.channel_invitations.clear();
 924        self.channel_participants.clear();
 925        self.channel_index.clear();
 926        self.outgoing_invites.clear();
 927        self.disconnect_channel_buffers_task.take();
 928
 929        for chat in self.opened_chats.values() {
 930            if let OpenedModelHandle::Open(chat) = chat {
 931                if let Some(chat) = chat.upgrade() {
 932                    chat.update(cx, |chat, cx| {
 933                        chat.rejoin(cx);
 934                    });
 935                }
 936            }
 937        }
 938
 939        let mut buffer_versions = Vec::new();
 940        for buffer in self.opened_buffers.values() {
 941            if let OpenedModelHandle::Open(buffer) = buffer {
 942                if let Some(buffer) = buffer.upgrade() {
 943                    let channel_buffer = buffer.read(cx);
 944                    let buffer = channel_buffer.buffer().read(cx);
 945                    buffer_versions.push(proto::ChannelBufferVersion {
 946                        channel_id: channel_buffer.channel_id.0,
 947                        epoch: channel_buffer.epoch(),
 948                        version: language::proto::serialize_version(&buffer.version()),
 949                    });
 950                }
 951            }
 952        }
 953
 954        if buffer_versions.is_empty() {
 955            return Task::ready(Ok(()));
 956        }
 957
 958        let response = self.client.request(proto::RejoinChannelBuffers {
 959            buffers: buffer_versions,
 960        });
 961
 962        cx.spawn(|this, mut cx| async move {
 963            let mut response = response.await?;
 964
 965            this.update(&mut cx, |this, cx| {
 966                this.opened_buffers.retain(|_, buffer| match buffer {
 967                    OpenedModelHandle::Open(channel_buffer) => {
 968                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 969                            return false;
 970                        };
 971
 972                        channel_buffer.update(cx, |channel_buffer, cx| {
 973                            let channel_id = channel_buffer.channel_id;
 974                            if let Some(remote_buffer) = response
 975                                .buffers
 976                                .iter_mut()
 977                                .find(|buffer| buffer.channel_id == channel_id.0)
 978                            {
 979                                let channel_id = channel_buffer.channel_id;
 980                                let remote_version =
 981                                    language::proto::deserialize_version(&remote_buffer.version);
 982
 983                                channel_buffer.replace_collaborators(
 984                                    mem::take(&mut remote_buffer.collaborators),
 985                                    cx,
 986                                );
 987
 988                                let operations = channel_buffer
 989                                    .buffer()
 990                                    .update(cx, |buffer, cx| {
 991                                        let outgoing_operations =
 992                                            buffer.serialize_ops(Some(remote_version), cx);
 993                                        let incoming_operations =
 994                                            mem::take(&mut remote_buffer.operations)
 995                                                .into_iter()
 996                                                .map(language::proto::deserialize_operation)
 997                                                .collect::<Result<Vec<_>>>()?;
 998                                        buffer.apply_ops(incoming_operations, cx)?;
 999                                        anyhow::Ok(outgoing_operations)
1000                                    })
1001                                    .log_err();
1002
1003                                if let Some(operations) = operations {
1004                                    let client = this.client.clone();
1005                                    cx.background_executor()
1006                                        .spawn(async move {
1007                                            let operations = operations.await;
1008                                            for chunk in
1009                                                language::proto::split_operations(operations)
1010                                            {
1011                                                client
1012                                                    .send(proto::UpdateChannelBuffer {
1013                                                        channel_id: channel_id.0,
1014                                                        operations: chunk,
1015                                                    })
1016                                                    .ok();
1017                                            }
1018                                        })
1019                                        .detach();
1020                                    return true;
1021                                }
1022                            }
1023
1024                            channel_buffer.disconnect(cx);
1025                            false
1026                        })
1027                    }
1028                    OpenedModelHandle::Loading(_) => true,
1029                });
1030            })
1031            .ok();
1032            anyhow::Ok(())
1033        })
1034    }
1035
1036    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1037        cx.notify();
1038
1039        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1040            cx.spawn(move |this, mut cx| async move {
1041                if wait_for_reconnect {
1042                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1043                }
1044
1045                if let Some(this) = this.upgrade() {
1046                    this.update(&mut cx, |this, cx| {
1047                        for (_, buffer) in this.opened_buffers.drain() {
1048                            if let OpenedModelHandle::Open(buffer) = buffer {
1049                                if let Some(buffer) = buffer.upgrade() {
1050                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1051                                }
1052                            }
1053                        }
1054                    })
1055                    .ok();
1056                }
1057            })
1058        });
1059    }
1060
1061    pub(crate) fn update_channels(
1062        &mut self,
1063        payload: proto::UpdateChannels,
1064        cx: &mut ModelContext<ChannelStore>,
1065    ) -> Option<Task<Result<()>>> {
1066        if !payload.remove_channel_invitations.is_empty() {
1067            self.channel_invitations
1068                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1069        }
1070        for channel in payload.channel_invitations {
1071            match self
1072                .channel_invitations
1073                .binary_search_by_key(&channel.id, |c| c.id.0)
1074            {
1075                Ok(ix) => {
1076                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1077                }
1078                Err(ix) => self.channel_invitations.insert(
1079                    ix,
1080                    Arc::new(Channel {
1081                        id: ChannelId(channel.id),
1082                        visibility: channel.visibility(),
1083                        name: channel.name.into(),
1084                        parent_path: channel
1085                            .parent_path
1086                            .into_iter()
1087                            .map(|cid| ChannelId(cid))
1088                            .collect(),
1089                    }),
1090                ),
1091            }
1092        }
1093
1094        let channels_changed = !payload.channels.is_empty()
1095            || !payload.delete_channels.is_empty()
1096            || !payload.latest_channel_message_ids.is_empty()
1097            || !payload.latest_channel_buffer_versions.is_empty()
1098            || !payload.hosted_projects.is_empty()
1099            || !payload.deleted_hosted_projects.is_empty();
1100
1101        if channels_changed {
1102            if !payload.delete_channels.is_empty() {
1103                let delete_channels: Vec<ChannelId> = payload
1104                    .delete_channels
1105                    .into_iter()
1106                    .map(|cid| ChannelId(cid))
1107                    .collect();
1108                self.channel_index.delete_channels(&delete_channels);
1109                self.channel_participants
1110                    .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1111
1112                for channel_id in &delete_channels {
1113                    let channel_id = *channel_id;
1114                    if payload
1115                        .channels
1116                        .iter()
1117                        .any(|channel| channel.id == channel_id.0)
1118                    {
1119                        continue;
1120                    }
1121                    if let Some(OpenedModelHandle::Open(buffer)) =
1122                        self.opened_buffers.remove(&channel_id)
1123                    {
1124                        if let Some(buffer) = buffer.upgrade() {
1125                            buffer.update(cx, ChannelBuffer::disconnect);
1126                        }
1127                    }
1128                }
1129            }
1130
1131            let mut index = self.channel_index.bulk_insert();
1132            for channel in payload.channels {
1133                let id = ChannelId(channel.id);
1134                let channel_changed = index.insert(channel);
1135
1136                if channel_changed {
1137                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1138                        if let Some(buffer) = buffer.upgrade() {
1139                            buffer.update(cx, ChannelBuffer::channel_changed);
1140                        }
1141                    }
1142                }
1143            }
1144
1145            for latest_buffer_version in payload.latest_channel_buffer_versions {
1146                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1147                self.channel_states
1148                    .entry(ChannelId(latest_buffer_version.channel_id))
1149                    .or_default()
1150                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1151            }
1152
1153            for latest_channel_message in payload.latest_channel_message_ids {
1154                self.channel_states
1155                    .entry(ChannelId(latest_channel_message.channel_id))
1156                    .or_default()
1157                    .update_latest_message_id(latest_channel_message.message_id);
1158            }
1159
1160            for hosted_project in payload.hosted_projects {
1161                let hosted_project: HostedProject = hosted_project.into();
1162                if let Some(old_project) = self
1163                    .hosted_projects
1164                    .insert(hosted_project.project_id, hosted_project.clone())
1165                {
1166                    self.channel_states
1167                        .entry(old_project.channel_id)
1168                        .or_default()
1169                        .remove_hosted_project(old_project.project_id);
1170                }
1171                self.channel_states
1172                    .entry(hosted_project.channel_id)
1173                    .or_default()
1174                    .add_hosted_project(hosted_project.project_id);
1175            }
1176
1177            for hosted_project_id in payload.deleted_hosted_projects {
1178                let hosted_project_id = ProjectId(hosted_project_id);
1179
1180                if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1181                    self.channel_states
1182                        .entry(old_project.channel_id)
1183                        .or_default()
1184                        .remove_hosted_project(old_project.project_id);
1185                }
1186            }
1187        }
1188
1189        cx.notify();
1190        if payload.channel_participants.is_empty() {
1191            return None;
1192        }
1193
1194        let mut all_user_ids = Vec::new();
1195        let channel_participants = payload.channel_participants;
1196        for entry in &channel_participants {
1197            for user_id in entry.participant_user_ids.iter() {
1198                if let Err(ix) = all_user_ids.binary_search(user_id) {
1199                    all_user_ids.insert(ix, *user_id);
1200                }
1201            }
1202        }
1203
1204        let users = self
1205            .user_store
1206            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1207        Some(cx.spawn(|this, mut cx| async move {
1208            let users = users.await?;
1209
1210            this.update(&mut cx, |this, cx| {
1211                for entry in &channel_participants {
1212                    let mut participants: Vec<_> = entry
1213                        .participant_user_ids
1214                        .iter()
1215                        .filter_map(|user_id| {
1216                            users
1217                                .binary_search_by_key(&user_id, |user| &user.id)
1218                                .ok()
1219                                .map(|ix| users[ix].clone())
1220                        })
1221                        .collect();
1222
1223                    participants.sort_by_key(|u| u.id);
1224
1225                    this.channel_participants
1226                        .insert(ChannelId(entry.channel_id), participants);
1227                }
1228
1229                cx.notify();
1230            })
1231        }))
1232    }
1233}
1234
1235impl ChannelState {
1236    fn set_role(&mut self, role: ChannelRole) {
1237        self.role = Some(role);
1238    }
1239
1240    fn has_channel_buffer_changed(&self) -> bool {
1241        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1242            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1243                && self
1244                    .latest_notes_version
1245                    .version
1246                    .changed_since(&self.observed_notes_version.version))
1247    }
1248
1249    fn has_new_messages(&self) -> bool {
1250        let latest_message_id = self.latest_chat_message;
1251        let observed_message_id = self.observed_chat_message;
1252
1253        latest_message_id.is_some_and(|latest_message_id| {
1254            latest_message_id > observed_message_id.unwrap_or_default()
1255        })
1256    }
1257
1258    fn last_acknowledged_message_id(&self) -> Option<u64> {
1259        self.observed_chat_message
1260    }
1261
1262    fn acknowledge_message_id(&mut self, message_id: u64) {
1263        let observed = self.observed_chat_message.get_or_insert(message_id);
1264        *observed = (*observed).max(message_id);
1265    }
1266
1267    fn update_latest_message_id(&mut self, message_id: u64) {
1268        self.latest_chat_message =
1269            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1270    }
1271
1272    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1273        if self.observed_notes_version.epoch == epoch {
1274            self.observed_notes_version.version.join(version);
1275        } else {
1276            self.observed_notes_version = NotesVersion {
1277                epoch,
1278                version: version.clone(),
1279            };
1280        }
1281    }
1282
1283    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1284        if self.latest_notes_version.epoch == epoch {
1285            self.latest_notes_version.version.join(version);
1286        } else {
1287            self.latest_notes_version = NotesVersion {
1288                epoch,
1289                version: version.clone(),
1290            };
1291        }
1292    }
1293
1294    fn add_hosted_project(&mut self, project_id: ProjectId) {
1295        self.projects.insert(project_id);
1296    }
1297
1298    fn remove_hosted_project(&mut self, project_id: ProjectId) {
1299        self.projects.remove(&project_id);
1300    }
1301}