channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use rpc::{
  15    proto::{self, ChannelRole, ChannelVisibility},
  16    TypedEnvelope,
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{maybe, ResultExt};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  25    let channel_store =
  26        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36#[derive(Debug, Clone)]
  37pub struct HostedProject {
  38    project_id: ProjectId,
  39    channel_id: ChannelId,
  40    name: SharedString,
  41    _visibility: proto::ChannelVisibility,
  42}
  43impl From<proto::HostedProject> for HostedProject {
  44    fn from(project: proto::HostedProject) -> Self {
  45        Self {
  46            project_id: ProjectId(project.project_id),
  47            channel_id: ChannelId(project.channel_id),
  48            _visibility: project.visibility(),
  49            name: project.name.into(),
  50        }
  51    }
  52}
  53pub struct ChannelStore {
  54    pub channel_index: ChannelIndex,
  55    channel_invitations: Vec<Arc<Channel>>,
  56    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  57    channel_states: HashMap<ChannelId, ChannelState>,
  58    hosted_projects: HashMap<ProjectId, HostedProject>,
  59
  60    outgoing_invites: HashSet<(ChannelId, UserId)>,
  61    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  62    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  63    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  64    client: Arc<Client>,
  65    user_store: Model<UserStore>,
  66    _rpc_subscriptions: [Subscription; 2],
  67    _watch_connection_status: Task<Option<()>>,
  68    disconnect_channel_buffers_task: Option<Task<()>>,
  69    _update_channels: Task<()>,
  70}
  71
  72#[derive(Clone, Debug)]
  73pub struct Channel {
  74    pub id: ChannelId,
  75    pub name: SharedString,
  76    pub visibility: proto::ChannelVisibility,
  77    pub parent_path: Vec<ChannelId>,
  78}
  79
  80#[derive(Default, Debug)]
  81pub struct ChannelState {
  82    latest_chat_message: Option<u64>,
  83    latest_notes_version: NotesVersion,
  84    observed_notes_version: NotesVersion,
  85    observed_chat_message: Option<u64>,
  86    role: Option<ChannelRole>,
  87    projects: HashSet<ProjectId>,
  88}
  89
  90impl Channel {
  91    pub fn link(&self, cx: &AppContext) -> String {
  92        format!(
  93            "{}/channel/{}-{}",
  94            ClientSettings::get_global(cx).server_url,
  95            Self::slug(&self.name),
  96            self.id
  97        )
  98    }
  99
 100    pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
 101        self.link(cx)
 102            + "/notes"
 103            + &heading
 104                .map(|h| format!("#{}", Self::slug(&h)))
 105                .unwrap_or_default()
 106    }
 107
 108    pub fn is_root_channel(&self) -> bool {
 109        self.parent_path.is_empty()
 110    }
 111
 112    pub fn root_id(&self) -> ChannelId {
 113        self.parent_path.first().copied().unwrap_or(self.id)
 114    }
 115
 116    pub fn slug(str: &str) -> String {
 117        let slug: String = str
 118            .chars()
 119            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 120            .collect();
 121
 122        slug.trim_matches(|c| c == '-').to_string()
 123    }
 124}
 125
 126pub struct ChannelMembership {
 127    pub user: Arc<User>,
 128    pub kind: proto::channel_member::Kind,
 129    pub role: proto::ChannelRole,
 130}
 131impl ChannelMembership {
 132    pub fn sort_key(&self) -> MembershipSortKey {
 133        MembershipSortKey {
 134            role_order: match self.role {
 135                proto::ChannelRole::Admin => 0,
 136                proto::ChannelRole::Member => 1,
 137                proto::ChannelRole::Banned => 2,
 138                proto::ChannelRole::Talker => 3,
 139                proto::ChannelRole::Guest => 4,
 140            },
 141            kind_order: match self.kind {
 142                proto::channel_member::Kind::Member => 0,
 143                proto::channel_member::Kind::Invitee => 1,
 144            },
 145            username_order: self.user.github_login.as_str(),
 146        }
 147    }
 148}
 149
 150#[derive(PartialOrd, Ord, PartialEq, Eq)]
 151pub struct MembershipSortKey<'a> {
 152    role_order: u8,
 153    kind_order: u8,
 154    username_order: &'a str,
 155}
 156
 157pub enum ChannelEvent {
 158    ChannelCreated(ChannelId),
 159    ChannelRenamed(ChannelId),
 160}
 161
 162impl EventEmitter<ChannelEvent> for ChannelStore {}
 163
 164enum OpenedModelHandle<E> {
 165    Open(WeakModel<E>),
 166    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 167}
 168
 169struct GlobalChannelStore(Model<ChannelStore>);
 170
 171impl Global for GlobalChannelStore {}
 172
 173impl ChannelStore {
 174    pub fn global(cx: &AppContext) -> Model<Self> {
 175        cx.global::<GlobalChannelStore>().0.clone()
 176    }
 177
 178    pub fn new(
 179        client: Arc<Client>,
 180        user_store: Model<UserStore>,
 181        cx: &mut ModelContext<Self>,
 182    ) -> Self {
 183        let rpc_subscriptions = [
 184            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 185            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 186        ];
 187
 188        let mut connection_status = client.status();
 189        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 190        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 191            while let Some(status) = connection_status.next().await {
 192                let this = this.upgrade()?;
 193                match status {
 194                    client::Status::Connected { .. } => {
 195                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 196                            .ok()?
 197                            .await
 198                            .log_err()?;
 199                    }
 200                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 201                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 202                            .ok();
 203                    }
 204                    _ => {
 205                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 206                            .ok();
 207                    }
 208                }
 209            }
 210            Some(())
 211        });
 212
 213        Self {
 214            channel_invitations: Vec::default(),
 215            channel_index: ChannelIndex::default(),
 216            channel_participants: Default::default(),
 217            hosted_projects: Default::default(),
 218            outgoing_invites: Default::default(),
 219            opened_buffers: Default::default(),
 220            opened_chats: Default::default(),
 221            update_channels_tx,
 222            client,
 223            user_store,
 224            _rpc_subscriptions: rpc_subscriptions,
 225            _watch_connection_status: watch_connection_status,
 226            disconnect_channel_buffers_task: None,
 227            _update_channels: cx.spawn(|this, mut cx| async move {
 228                maybe!(async move {
 229                    while let Some(update_channels) = update_channels_rx.next().await {
 230                        if let Some(this) = this.upgrade() {
 231                            let update_task = this.update(&mut cx, |this, cx| {
 232                                this.update_channels(update_channels, cx)
 233                            })?;
 234                            if let Some(update_task) = update_task {
 235                                update_task.await.log_err();
 236                            }
 237                        }
 238                    }
 239                    anyhow::Ok(())
 240                })
 241                .await
 242                .log_err();
 243            }),
 244            channel_states: Default::default(),
 245        }
 246    }
 247
 248    pub fn client(&self) -> Arc<Client> {
 249        self.client.clone()
 250    }
 251
 252    /// Returns the number of unique channels in the store
 253    pub fn channel_count(&self) -> usize {
 254        self.channel_index.by_id().len()
 255    }
 256
 257    /// Returns the index of a channel ID in the list of unique channels
 258    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 259        self.channel_index
 260            .by_id()
 261            .keys()
 262            .position(|id| *id == channel_id)
 263    }
 264
 265    /// Returns an iterator over all unique channels
 266    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 267        self.channel_index.by_id().values()
 268    }
 269
 270    /// Iterate over all entries in the channel DAG
 271    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 272        self.channel_index
 273            .ordered_channels()
 274            .iter()
 275            .filter_map(move |id| {
 276                let channel = self.channel_index.by_id().get(id)?;
 277                Some((channel.parent_path.len(), channel))
 278            })
 279    }
 280
 281    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 282        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 283        self.channel_index.by_id().get(channel_id)
 284    }
 285
 286    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 287        self.channel_index.by_id().values().nth(ix)
 288    }
 289
 290    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 291        self.channel_invitations
 292            .iter()
 293            .any(|channel| channel.id == channel_id)
 294    }
 295
 296    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 297        &self.channel_invitations
 298    }
 299
 300    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 301        self.channel_index.by_id().get(&channel_id)
 302    }
 303
 304    pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
 305        let mut projects: Vec<(SharedString, ProjectId)> = self
 306            .channel_states
 307            .get(&channel_id)
 308            .map(|state| state.projects.clone())
 309            .unwrap_or_default()
 310            .into_iter()
 311            .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
 312            .collect();
 313        projects.sort();
 314        projects
 315    }
 316
 317    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 318        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 319            if let OpenedModelHandle::Open(buffer) = buffer {
 320                return buffer.upgrade().is_some();
 321            }
 322        }
 323        false
 324    }
 325
 326    pub fn open_channel_buffer(
 327        &mut self,
 328        channel_id: ChannelId,
 329        cx: &mut ModelContext<Self>,
 330    ) -> Task<Result<Model<ChannelBuffer>>> {
 331        let client = self.client.clone();
 332        let user_store = self.user_store.clone();
 333        let channel_store = cx.handle();
 334        self.open_channel_resource(
 335            channel_id,
 336            |this| &mut this.opened_buffers,
 337            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 338            cx,
 339        )
 340    }
 341
 342    pub fn fetch_channel_messages(
 343        &self,
 344        message_ids: Vec<u64>,
 345        cx: &mut ModelContext<Self>,
 346    ) -> Task<Result<Vec<ChannelMessage>>> {
 347        let request = if message_ids.is_empty() {
 348            None
 349        } else {
 350            Some(
 351                self.client
 352                    .request(proto::GetChannelMessagesById { message_ids }),
 353            )
 354        };
 355        cx.spawn(|this, mut cx| async move {
 356            if let Some(request) = request {
 357                let response = request.await?;
 358                let this = this
 359                    .upgrade()
 360                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 361                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 362                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 363            } else {
 364                Ok(Vec::new())
 365            }
 366        })
 367    }
 368
 369    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 370        self.channel_states
 371            .get(&channel_id)
 372            .is_some_and(|state| state.has_channel_buffer_changed())
 373    }
 374
 375    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 376        self.channel_states
 377            .get(&channel_id)
 378            .is_some_and(|state| state.has_new_messages())
 379    }
 380
 381    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 382        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 383            state.latest_chat_message = message_id;
 384        }
 385    }
 386
 387    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 388        self.channel_states.get(&channel_id).and_then(|state| {
 389            if let Some(last_message_id) = state.latest_chat_message {
 390                if state
 391                    .last_acknowledged_message_id()
 392                    .is_some_and(|id| id < last_message_id)
 393                {
 394                    return state.last_acknowledged_message_id();
 395                }
 396            }
 397
 398            None
 399        })
 400    }
 401
 402    pub fn acknowledge_message_id(
 403        &mut self,
 404        channel_id: ChannelId,
 405        message_id: u64,
 406        cx: &mut ModelContext<Self>,
 407    ) {
 408        self.channel_states
 409            .entry(channel_id)
 410            .or_insert_with(|| Default::default())
 411            .acknowledge_message_id(message_id);
 412        cx.notify();
 413    }
 414
 415    pub fn update_latest_message_id(
 416        &mut self,
 417        channel_id: ChannelId,
 418        message_id: u64,
 419        cx: &mut ModelContext<Self>,
 420    ) {
 421        self.channel_states
 422            .entry(channel_id)
 423            .or_insert_with(|| Default::default())
 424            .update_latest_message_id(message_id);
 425        cx.notify();
 426    }
 427
 428    pub fn acknowledge_notes_version(
 429        &mut self,
 430        channel_id: ChannelId,
 431        epoch: u64,
 432        version: &clock::Global,
 433        cx: &mut ModelContext<Self>,
 434    ) {
 435        self.channel_states
 436            .entry(channel_id)
 437            .or_insert_with(|| Default::default())
 438            .acknowledge_notes_version(epoch, version);
 439        cx.notify()
 440    }
 441
 442    pub fn update_latest_notes_version(
 443        &mut self,
 444        channel_id: ChannelId,
 445        epoch: u64,
 446        version: &clock::Global,
 447        cx: &mut ModelContext<Self>,
 448    ) {
 449        self.channel_states
 450            .entry(channel_id)
 451            .or_insert_with(|| Default::default())
 452            .update_latest_notes_version(epoch, version);
 453        cx.notify()
 454    }
 455
 456    pub fn open_channel_chat(
 457        &mut self,
 458        channel_id: ChannelId,
 459        cx: &mut ModelContext<Self>,
 460    ) -> Task<Result<Model<ChannelChat>>> {
 461        let client = self.client.clone();
 462        let user_store = self.user_store.clone();
 463        let this = cx.handle();
 464        self.open_channel_resource(
 465            channel_id,
 466            |this| &mut this.opened_chats,
 467            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 468            cx,
 469        )
 470    }
 471
 472    /// Asynchronously open a given resource associated with a channel.
 473    ///
 474    /// Make sure that the resource is only opened once, even if this method
 475    /// is called multiple times with the same channel id while the first task
 476    /// is still running.
 477    fn open_channel_resource<T, F, Fut>(
 478        &mut self,
 479        channel_id: ChannelId,
 480        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 481        load: F,
 482        cx: &mut ModelContext<Self>,
 483    ) -> Task<Result<Model<T>>>
 484    where
 485        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 486        Fut: Future<Output = Result<Model<T>>>,
 487        T: 'static,
 488    {
 489        let task = loop {
 490            match get_map(self).entry(channel_id) {
 491                hash_map::Entry::Occupied(e) => match e.get() {
 492                    OpenedModelHandle::Open(model) => {
 493                        if let Some(model) = model.upgrade() {
 494                            break Task::ready(Ok(model)).shared();
 495                        } else {
 496                            get_map(self).remove(&channel_id);
 497                            continue;
 498                        }
 499                    }
 500                    OpenedModelHandle::Loading(task) => {
 501                        break task.clone();
 502                    }
 503                },
 504                hash_map::Entry::Vacant(e) => {
 505                    let task = cx
 506                        .spawn(move |this, mut cx| async move {
 507                            let channel = this.update(&mut cx, |this, _| {
 508                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 509                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 510                                })
 511                            })??;
 512
 513                            load(channel, cx).await.map_err(Arc::new)
 514                        })
 515                        .shared();
 516
 517                    e.insert(OpenedModelHandle::Loading(task.clone()));
 518                    cx.spawn({
 519                        let task = task.clone();
 520                        move |this, mut cx| async move {
 521                            let result = task.await;
 522                            this.update(&mut cx, |this, _| match result {
 523                                Ok(model) => {
 524                                    get_map(this).insert(
 525                                        channel_id,
 526                                        OpenedModelHandle::Open(model.downgrade()),
 527                                    );
 528                                }
 529                                Err(_) => {
 530                                    get_map(this).remove(&channel_id);
 531                                }
 532                            })
 533                            .ok();
 534                        }
 535                    })
 536                    .detach();
 537                    break task;
 538                }
 539            }
 540        };
 541        cx.background_executor()
 542            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 543    }
 544
 545    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 546        self.channel_role(channel_id) == proto::ChannelRole::Admin
 547    }
 548
 549    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 550        self.channel_index
 551            .by_id()
 552            .get(&channel_id)
 553            .map_or(false, |channel| channel.is_root_channel())
 554    }
 555
 556    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 557        self.channel_index
 558            .by_id()
 559            .get(&channel_id)
 560            .map_or(false, |channel| {
 561                channel.visibility == ChannelVisibility::Public
 562            })
 563    }
 564
 565    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 566        match self.channel_role(channel_id) {
 567            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 568            _ => Capability::ReadOnly,
 569        }
 570    }
 571
 572    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 573        maybe!({
 574            let mut channel = self.channel_for_id(channel_id)?;
 575            if !channel.is_root_channel() {
 576                channel = self.channel_for_id(channel.root_id())?;
 577            }
 578            let root_channel_state = self.channel_states.get(&channel.id);
 579            root_channel_state?.role
 580        })
 581        .unwrap_or(proto::ChannelRole::Guest)
 582    }
 583
 584    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 585        self.channel_participants
 586            .get(&channel_id)
 587            .map_or(&[], |v| v.as_slice())
 588    }
 589
 590    pub fn create_channel(
 591        &self,
 592        name: &str,
 593        parent_id: Option<ChannelId>,
 594        cx: &mut ModelContext<Self>,
 595    ) -> Task<Result<ChannelId>> {
 596        let client = self.client.clone();
 597        let name = name.trim_start_matches('#').to_owned();
 598        cx.spawn(move |this, mut cx| async move {
 599            let response = client
 600                .request(proto::CreateChannel {
 601                    name,
 602                    parent_id: parent_id.map(|cid| cid.0),
 603                })
 604                .await?;
 605
 606            let channel = response
 607                .channel
 608                .ok_or_else(|| anyhow!("missing channel in response"))?;
 609            let channel_id = ChannelId(channel.id);
 610
 611            this.update(&mut cx, |this, cx| {
 612                let task = this.update_channels(
 613                    proto::UpdateChannels {
 614                        channels: vec![channel],
 615                        ..Default::default()
 616                    },
 617                    cx,
 618                );
 619                assert!(task.is_none());
 620
 621                // This event is emitted because the collab panel wants to clear the pending edit state
 622                // before this frame is rendered. But we can't guarantee that the collab panel's future
 623                // will resolve before this flush_effects finishes. Synchronously emitting this event
 624                // ensures that the collab panel will observe this creation before the frame completes
 625                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 626            })?;
 627
 628            Ok(channel_id)
 629        })
 630    }
 631
 632    pub fn move_channel(
 633        &mut self,
 634        channel_id: ChannelId,
 635        to: ChannelId,
 636        cx: &mut ModelContext<Self>,
 637    ) -> Task<Result<()>> {
 638        let client = self.client.clone();
 639        cx.spawn(move |_, _| async move {
 640            let _ = client
 641                .request(proto::MoveChannel {
 642                    channel_id: channel_id.0,
 643                    to: to.0,
 644                })
 645                .await?;
 646
 647            Ok(())
 648        })
 649    }
 650
 651    pub fn set_channel_visibility(
 652        &mut self,
 653        channel_id: ChannelId,
 654        visibility: ChannelVisibility,
 655        cx: &mut ModelContext<Self>,
 656    ) -> Task<Result<()>> {
 657        let client = self.client.clone();
 658        cx.spawn(move |_, _| async move {
 659            let _ = client
 660                .request(proto::SetChannelVisibility {
 661                    channel_id: channel_id.0,
 662                    visibility: visibility.into(),
 663                })
 664                .await?;
 665
 666            Ok(())
 667        })
 668    }
 669
 670    pub fn invite_member(
 671        &mut self,
 672        channel_id: ChannelId,
 673        user_id: UserId,
 674        role: proto::ChannelRole,
 675        cx: &mut ModelContext<Self>,
 676    ) -> Task<Result<()>> {
 677        if !self.outgoing_invites.insert((channel_id, user_id)) {
 678            return Task::ready(Err(anyhow!("invite request already in progress")));
 679        }
 680
 681        cx.notify();
 682        let client = self.client.clone();
 683        cx.spawn(move |this, mut cx| async move {
 684            let result = client
 685                .request(proto::InviteChannelMember {
 686                    channel_id: channel_id.0,
 687                    user_id,
 688                    role: role.into(),
 689                })
 690                .await;
 691
 692            this.update(&mut cx, |this, cx| {
 693                this.outgoing_invites.remove(&(channel_id, user_id));
 694                cx.notify();
 695            })?;
 696
 697            result?;
 698
 699            Ok(())
 700        })
 701    }
 702
 703    pub fn remove_member(
 704        &mut self,
 705        channel_id: ChannelId,
 706        user_id: u64,
 707        cx: &mut ModelContext<Self>,
 708    ) -> Task<Result<()>> {
 709        if !self.outgoing_invites.insert((channel_id, user_id)) {
 710            return Task::ready(Err(anyhow!("invite request already in progress")));
 711        }
 712
 713        cx.notify();
 714        let client = self.client.clone();
 715        cx.spawn(move |this, mut cx| async move {
 716            let result = client
 717                .request(proto::RemoveChannelMember {
 718                    channel_id: channel_id.0,
 719                    user_id,
 720                })
 721                .await;
 722
 723            this.update(&mut cx, |this, cx| {
 724                this.outgoing_invites.remove(&(channel_id, user_id));
 725                cx.notify();
 726            })?;
 727            result?;
 728            Ok(())
 729        })
 730    }
 731
 732    pub fn set_member_role(
 733        &mut self,
 734        channel_id: ChannelId,
 735        user_id: UserId,
 736        role: proto::ChannelRole,
 737        cx: &mut ModelContext<Self>,
 738    ) -> Task<Result<()>> {
 739        if !self.outgoing_invites.insert((channel_id, user_id)) {
 740            return Task::ready(Err(anyhow!("member request already in progress")));
 741        }
 742
 743        cx.notify();
 744        let client = self.client.clone();
 745        cx.spawn(move |this, mut cx| async move {
 746            let result = client
 747                .request(proto::SetChannelMemberRole {
 748                    channel_id: channel_id.0,
 749                    user_id,
 750                    role: role.into(),
 751                })
 752                .await;
 753
 754            this.update(&mut cx, |this, cx| {
 755                this.outgoing_invites.remove(&(channel_id, user_id));
 756                cx.notify();
 757            })?;
 758
 759            result?;
 760            Ok(())
 761        })
 762    }
 763
 764    pub fn rename(
 765        &mut self,
 766        channel_id: ChannelId,
 767        new_name: &str,
 768        cx: &mut ModelContext<Self>,
 769    ) -> Task<Result<()>> {
 770        let client = self.client.clone();
 771        let name = new_name.to_string();
 772        cx.spawn(move |this, mut cx| async move {
 773            let channel = client
 774                .request(proto::RenameChannel {
 775                    channel_id: channel_id.0,
 776                    name,
 777                })
 778                .await?
 779                .channel
 780                .ok_or_else(|| anyhow!("missing channel in response"))?;
 781            this.update(&mut cx, |this, cx| {
 782                let task = this.update_channels(
 783                    proto::UpdateChannels {
 784                        channels: vec![channel],
 785                        ..Default::default()
 786                    },
 787                    cx,
 788                );
 789                assert!(task.is_none());
 790
 791                // This event is emitted because the collab panel wants to clear the pending edit state
 792                // before this frame is rendered. But we can't guarantee that the collab panel's future
 793                // will resolve before this flush_effects finishes. Synchronously emitting this event
 794                // ensures that the collab panel will observe this creation before the frame complete
 795                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 796            })?;
 797            Ok(())
 798        })
 799    }
 800
 801    pub fn respond_to_channel_invite(
 802        &mut self,
 803        channel_id: ChannelId,
 804        accept: bool,
 805        cx: &mut ModelContext<Self>,
 806    ) -> Task<Result<()>> {
 807        let client = self.client.clone();
 808        cx.background_executor().spawn(async move {
 809            client
 810                .request(proto::RespondToChannelInvite {
 811                    channel_id: channel_id.0,
 812                    accept,
 813                })
 814                .await?;
 815            Ok(())
 816        })
 817    }
 818    pub fn get_channel_member_details(
 819        &self,
 820        channel_id: ChannelId,
 821        cx: &mut ModelContext<Self>,
 822    ) -> Task<Result<Vec<ChannelMembership>>> {
 823        let client = self.client.clone();
 824        let user_store = self.user_store.downgrade();
 825        cx.spawn(move |_, mut cx| async move {
 826            let response = client
 827                .request(proto::GetChannelMembers {
 828                    channel_id: channel_id.0,
 829                })
 830                .await?;
 831
 832            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 833            let user_store = user_store
 834                .upgrade()
 835                .ok_or_else(|| anyhow!("user store dropped"))?;
 836            let users = user_store
 837                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 838                .await?;
 839
 840            Ok(users
 841                .into_iter()
 842                .zip(response.members)
 843                .map(|(user, member)| ChannelMembership {
 844                    user,
 845                    role: member.role(),
 846                    kind: member.kind(),
 847                })
 848                .collect())
 849        })
 850    }
 851
 852    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 853        let client = self.client.clone();
 854        async move {
 855            client
 856                .request(proto::DeleteChannel {
 857                    channel_id: channel_id.0,
 858                })
 859                .await?;
 860            Ok(())
 861        }
 862    }
 863
 864    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 865        false
 866    }
 867
 868    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 869        self.outgoing_invites.contains(&(channel_id, user_id))
 870    }
 871
 872    async fn handle_update_channels(
 873        this: Model<Self>,
 874        message: TypedEnvelope<proto::UpdateChannels>,
 875        _: Arc<Client>,
 876        mut cx: AsyncAppContext,
 877    ) -> Result<()> {
 878        this.update(&mut cx, |this, _| {
 879            this.update_channels_tx
 880                .unbounded_send(message.payload)
 881                .unwrap();
 882        })?;
 883        Ok(())
 884    }
 885
 886    async fn handle_update_user_channels(
 887        this: Model<Self>,
 888        message: TypedEnvelope<proto::UpdateUserChannels>,
 889        _: Arc<Client>,
 890        mut cx: AsyncAppContext,
 891    ) -> Result<()> {
 892        this.update(&mut cx, |this, cx| {
 893            for buffer_version in message.payload.observed_channel_buffer_version {
 894                let version = language::proto::deserialize_version(&buffer_version.version);
 895                this.acknowledge_notes_version(
 896                    ChannelId(buffer_version.channel_id),
 897                    buffer_version.epoch,
 898                    &version,
 899                    cx,
 900                );
 901            }
 902            for message_id in message.payload.observed_channel_message_id {
 903                this.acknowledge_message_id(
 904                    ChannelId(message_id.channel_id),
 905                    message_id.message_id,
 906                    cx,
 907                );
 908            }
 909            for membership in message.payload.channel_memberships {
 910                if let Some(role) = ChannelRole::from_i32(membership.role) {
 911                    this.channel_states
 912                        .entry(ChannelId(membership.channel_id))
 913                        .or_insert_with(|| ChannelState::default())
 914                        .set_role(role)
 915                }
 916            }
 917        })
 918    }
 919
 920    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 921        self.channel_index.clear();
 922        self.channel_invitations.clear();
 923        self.channel_participants.clear();
 924        self.channel_index.clear();
 925        self.outgoing_invites.clear();
 926        self.disconnect_channel_buffers_task.take();
 927
 928        for chat in self.opened_chats.values() {
 929            if let OpenedModelHandle::Open(chat) = chat {
 930                if let Some(chat) = chat.upgrade() {
 931                    chat.update(cx, |chat, cx| {
 932                        chat.rejoin(cx);
 933                    });
 934                }
 935            }
 936        }
 937
 938        let mut buffer_versions = Vec::new();
 939        for buffer in self.opened_buffers.values() {
 940            if let OpenedModelHandle::Open(buffer) = buffer {
 941                if let Some(buffer) = buffer.upgrade() {
 942                    let channel_buffer = buffer.read(cx);
 943                    let buffer = channel_buffer.buffer().read(cx);
 944                    buffer_versions.push(proto::ChannelBufferVersion {
 945                        channel_id: channel_buffer.channel_id.0,
 946                        epoch: channel_buffer.epoch(),
 947                        version: language::proto::serialize_version(&buffer.version()),
 948                    });
 949                }
 950            }
 951        }
 952
 953        if buffer_versions.is_empty() {
 954            return Task::ready(Ok(()));
 955        }
 956
 957        let response = self.client.request(proto::RejoinChannelBuffers {
 958            buffers: buffer_versions,
 959        });
 960
 961        cx.spawn(|this, mut cx| async move {
 962            let mut response = response.await?;
 963
 964            this.update(&mut cx, |this, cx| {
 965                this.opened_buffers.retain(|_, buffer| match buffer {
 966                    OpenedModelHandle::Open(channel_buffer) => {
 967                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 968                            return false;
 969                        };
 970
 971                        channel_buffer.update(cx, |channel_buffer, cx| {
 972                            let channel_id = channel_buffer.channel_id;
 973                            if let Some(remote_buffer) = response
 974                                .buffers
 975                                .iter_mut()
 976                                .find(|buffer| buffer.channel_id == channel_id.0)
 977                            {
 978                                let channel_id = channel_buffer.channel_id;
 979                                let remote_version =
 980                                    language::proto::deserialize_version(&remote_buffer.version);
 981
 982                                channel_buffer.replace_collaborators(
 983                                    mem::take(&mut remote_buffer.collaborators),
 984                                    cx,
 985                                );
 986
 987                                let operations = channel_buffer
 988                                    .buffer()
 989                                    .update(cx, |buffer, cx| {
 990                                        let outgoing_operations =
 991                                            buffer.serialize_ops(Some(remote_version), cx);
 992                                        let incoming_operations =
 993                                            mem::take(&mut remote_buffer.operations)
 994                                                .into_iter()
 995                                                .map(language::proto::deserialize_operation)
 996                                                .collect::<Result<Vec<_>>>()?;
 997                                        buffer.apply_ops(incoming_operations, cx)?;
 998                                        anyhow::Ok(outgoing_operations)
 999                                    })
1000                                    .log_err();
1001
1002                                if let Some(operations) = operations {
1003                                    let client = this.client.clone();
1004                                    cx.background_executor()
1005                                        .spawn(async move {
1006                                            let operations = operations.await;
1007                                            for chunk in
1008                                                language::proto::split_operations(operations)
1009                                            {
1010                                                client
1011                                                    .send(proto::UpdateChannelBuffer {
1012                                                        channel_id: channel_id.0,
1013                                                        operations: chunk,
1014                                                    })
1015                                                    .ok();
1016                                            }
1017                                        })
1018                                        .detach();
1019                                    return true;
1020                                }
1021                            }
1022
1023                            channel_buffer.disconnect(cx);
1024                            false
1025                        })
1026                    }
1027                    OpenedModelHandle::Loading(_) => true,
1028                });
1029            })
1030            .ok();
1031            anyhow::Ok(())
1032        })
1033    }
1034
1035    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1036        cx.notify();
1037
1038        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1039            cx.spawn(move |this, mut cx| async move {
1040                if wait_for_reconnect {
1041                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1042                }
1043
1044                if let Some(this) = this.upgrade() {
1045                    this.update(&mut cx, |this, cx| {
1046                        for (_, buffer) in this.opened_buffers.drain() {
1047                            if let OpenedModelHandle::Open(buffer) = buffer {
1048                                if let Some(buffer) = buffer.upgrade() {
1049                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1050                                }
1051                            }
1052                        }
1053                    })
1054                    .ok();
1055                }
1056            })
1057        });
1058    }
1059
1060    pub(crate) fn update_channels(
1061        &mut self,
1062        payload: proto::UpdateChannels,
1063        cx: &mut ModelContext<ChannelStore>,
1064    ) -> Option<Task<Result<()>>> {
1065        if !payload.remove_channel_invitations.is_empty() {
1066            self.channel_invitations
1067                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1068        }
1069        for channel in payload.channel_invitations {
1070            match self
1071                .channel_invitations
1072                .binary_search_by_key(&channel.id, |c| c.id.0)
1073            {
1074                Ok(ix) => {
1075                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1076                }
1077                Err(ix) => self.channel_invitations.insert(
1078                    ix,
1079                    Arc::new(Channel {
1080                        id: ChannelId(channel.id),
1081                        visibility: channel.visibility(),
1082                        name: channel.name.into(),
1083                        parent_path: channel
1084                            .parent_path
1085                            .into_iter()
1086                            .map(|cid| ChannelId(cid))
1087                            .collect(),
1088                    }),
1089                ),
1090            }
1091        }
1092
1093        let channels_changed = !payload.channels.is_empty()
1094            || !payload.delete_channels.is_empty()
1095            || !payload.latest_channel_message_ids.is_empty()
1096            || !payload.latest_channel_buffer_versions.is_empty()
1097            || !payload.hosted_projects.is_empty()
1098            || !payload.deleted_hosted_projects.is_empty();
1099
1100        if channels_changed {
1101            if !payload.delete_channels.is_empty() {
1102                let delete_channels: Vec<ChannelId> = payload
1103                    .delete_channels
1104                    .into_iter()
1105                    .map(|cid| ChannelId(cid))
1106                    .collect();
1107                self.channel_index.delete_channels(&delete_channels);
1108                self.channel_participants
1109                    .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1110
1111                for channel_id in &delete_channels {
1112                    let channel_id = *channel_id;
1113                    if payload
1114                        .channels
1115                        .iter()
1116                        .any(|channel| channel.id == channel_id.0)
1117                    {
1118                        continue;
1119                    }
1120                    if let Some(OpenedModelHandle::Open(buffer)) =
1121                        self.opened_buffers.remove(&channel_id)
1122                    {
1123                        if let Some(buffer) = buffer.upgrade() {
1124                            buffer.update(cx, ChannelBuffer::disconnect);
1125                        }
1126                    }
1127                }
1128            }
1129
1130            let mut index = self.channel_index.bulk_insert();
1131            for channel in payload.channels {
1132                let id = ChannelId(channel.id);
1133                let channel_changed = index.insert(channel);
1134
1135                if channel_changed {
1136                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1137                        if let Some(buffer) = buffer.upgrade() {
1138                            buffer.update(cx, ChannelBuffer::channel_changed);
1139                        }
1140                    }
1141                }
1142            }
1143
1144            for latest_buffer_version in payload.latest_channel_buffer_versions {
1145                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1146                self.channel_states
1147                    .entry(ChannelId(latest_buffer_version.channel_id))
1148                    .or_default()
1149                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1150            }
1151
1152            for latest_channel_message in payload.latest_channel_message_ids {
1153                self.channel_states
1154                    .entry(ChannelId(latest_channel_message.channel_id))
1155                    .or_default()
1156                    .update_latest_message_id(latest_channel_message.message_id);
1157            }
1158
1159            for hosted_project in payload.hosted_projects {
1160                let hosted_project: HostedProject = hosted_project.into();
1161                if let Some(old_project) = self
1162                    .hosted_projects
1163                    .insert(hosted_project.project_id, hosted_project.clone())
1164                {
1165                    self.channel_states
1166                        .entry(old_project.channel_id)
1167                        .or_default()
1168                        .remove_hosted_project(old_project.project_id);
1169                }
1170                self.channel_states
1171                    .entry(hosted_project.channel_id)
1172                    .or_default()
1173                    .add_hosted_project(hosted_project.project_id);
1174            }
1175
1176            for hosted_project_id in payload.deleted_hosted_projects {
1177                let hosted_project_id = ProjectId(hosted_project_id);
1178
1179                if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1180                    self.channel_states
1181                        .entry(old_project.channel_id)
1182                        .or_default()
1183                        .remove_hosted_project(old_project.project_id);
1184                }
1185            }
1186        }
1187
1188        cx.notify();
1189        if payload.channel_participants.is_empty() {
1190            return None;
1191        }
1192
1193        let mut all_user_ids = Vec::new();
1194        let channel_participants = payload.channel_participants;
1195        for entry in &channel_participants {
1196            for user_id in entry.participant_user_ids.iter() {
1197                if let Err(ix) = all_user_ids.binary_search(user_id) {
1198                    all_user_ids.insert(ix, *user_id);
1199                }
1200            }
1201        }
1202
1203        let users = self
1204            .user_store
1205            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1206        Some(cx.spawn(|this, mut cx| async move {
1207            let users = users.await?;
1208
1209            this.update(&mut cx, |this, cx| {
1210                for entry in &channel_participants {
1211                    let mut participants: Vec<_> = entry
1212                        .participant_user_ids
1213                        .iter()
1214                        .filter_map(|user_id| {
1215                            users
1216                                .binary_search_by_key(&user_id, |user| &user.id)
1217                                .ok()
1218                                .map(|ix| users[ix].clone())
1219                        })
1220                        .collect();
1221
1222                    participants.sort_by_key(|u| u.id);
1223
1224                    this.channel_participants
1225                        .insert(ChannelId(entry.channel_id), participants);
1226                }
1227
1228                cx.notify();
1229            })
1230        }))
1231    }
1232}
1233
1234impl ChannelState {
1235    fn set_role(&mut self, role: ChannelRole) {
1236        self.role = Some(role);
1237    }
1238
1239    fn has_channel_buffer_changed(&self) -> bool {
1240        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1241            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1242                && self
1243                    .latest_notes_version
1244                    .version
1245                    .changed_since(&self.observed_notes_version.version))
1246    }
1247
1248    fn has_new_messages(&self) -> bool {
1249        let latest_message_id = self.latest_chat_message;
1250        let observed_message_id = self.observed_chat_message;
1251
1252        latest_message_id.is_some_and(|latest_message_id| {
1253            latest_message_id > observed_message_id.unwrap_or_default()
1254        })
1255    }
1256
1257    fn last_acknowledged_message_id(&self) -> Option<u64> {
1258        self.observed_chat_message
1259    }
1260
1261    fn acknowledge_message_id(&mut self, message_id: u64) {
1262        let observed = self.observed_chat_message.get_or_insert(message_id);
1263        *observed = (*observed).max(message_id);
1264    }
1265
1266    fn update_latest_message_id(&mut self, message_id: u64) {
1267        self.latest_chat_message =
1268            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1269    }
1270
1271    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1272        if self.observed_notes_version.epoch == epoch {
1273            self.observed_notes_version.version.join(version);
1274        } else {
1275            self.observed_notes_version = NotesVersion {
1276                epoch,
1277                version: version.clone(),
1278            };
1279        }
1280    }
1281
1282    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1283        if self.latest_notes_version.epoch == epoch {
1284            self.latest_notes_version.version.join(version);
1285        } else {
1286            self.latest_notes_version = NotesVersion {
1287                epoch,
1288                version: version.clone(),
1289            };
1290        }
1291    }
1292
1293    fn add_hosted_project(&mut self, project_id: ProjectId) {
1294        self.projects.insert(project_id);
1295    }
1296
1297    fn remove_hosted_project(&mut self, project_id: ProjectId) {
1298        self.projects.remove(&project_id);
1299    }
1300}