channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{
   7    ChannelId, Client, ClientSettings, HostedProjectId, Subscription, User, UserId, UserStore,
   8};
   9use collections::{hash_map, HashMap, HashSet};
  10use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  11use gpui::{
  12    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  13    Task, WeakModel,
  14};
  15use language::Capability;
  16use rpc::{
  17    proto::{self, ChannelRole, ChannelVisibility},
  18    TypedEnvelope,
  19};
  20use settings::Settings;
  21use std::{mem, sync::Arc, time::Duration};
  22use util::{async_maybe, maybe, ResultExt};
  23
  24pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  25
  26pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  27    let channel_store =
  28        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  29    cx.set_global(GlobalChannelStore(channel_store));
  30}
  31
  32#[derive(Debug, Clone, Default)]
  33struct NotesVersion {
  34    epoch: u64,
  35    version: clock::Global,
  36}
  37
  38#[derive(Debug, Clone)]
  39pub struct HostedProject {
  40    id: HostedProjectId,
  41    channel_id: ChannelId,
  42    name: SharedString,
  43    _visibility: proto::ChannelVisibility,
  44}
  45
  46impl From<proto::HostedProject> for HostedProject {
  47    fn from(project: proto::HostedProject) -> Self {
  48        Self {
  49            id: HostedProjectId(project.id),
  50            channel_id: ChannelId(project.channel_id),
  51            _visibility: project.visibility(),
  52            name: project.name.into(),
  53        }
  54    }
  55}
  56
  57pub struct ChannelStore {
  58    pub channel_index: ChannelIndex,
  59    channel_invitations: Vec<Arc<Channel>>,
  60    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  61    channel_states: HashMap<ChannelId, ChannelState>,
  62    hosted_projects: HashMap<HostedProjectId, HostedProject>,
  63
  64    outgoing_invites: HashSet<(ChannelId, UserId)>,
  65    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  66    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  67    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  68    client: Arc<Client>,
  69    user_store: Model<UserStore>,
  70    _rpc_subscriptions: [Subscription; 2],
  71    _watch_connection_status: Task<Option<()>>,
  72    disconnect_channel_buffers_task: Option<Task<()>>,
  73    _update_channels: Task<()>,
  74}
  75
  76#[derive(Clone, Debug)]
  77pub struct Channel {
  78    pub id: ChannelId,
  79    pub name: SharedString,
  80    pub visibility: proto::ChannelVisibility,
  81    pub parent_path: Vec<ChannelId>,
  82}
  83
  84#[derive(Default)]
  85pub struct ChannelState {
  86    latest_chat_message: Option<u64>,
  87    latest_notes_versions: Option<NotesVersion>,
  88    observed_chat_message: Option<u64>,
  89    observed_notes_versions: Option<NotesVersion>,
  90    role: Option<ChannelRole>,
  91    projects: HashSet<HostedProjectId>,
  92}
  93
  94impl Channel {
  95    pub fn link(&self, cx: &AppContext) -> String {
  96        format!(
  97            "{}/channel/{}-{}",
  98            ClientSettings::get_global(cx).server_url,
  99            Self::slug(&self.name),
 100            self.id
 101        )
 102    }
 103
 104    pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
 105        self.link(cx)
 106            + "/notes"
 107            + &heading
 108                .map(|h| format!("#{}", Self::slug(&h)))
 109                .unwrap_or_default()
 110    }
 111
 112    pub fn is_root_channel(&self) -> bool {
 113        self.parent_path.is_empty()
 114    }
 115
 116    pub fn root_id(&self) -> ChannelId {
 117        self.parent_path.first().copied().unwrap_or(self.id)
 118    }
 119
 120    pub fn slug(str: &str) -> String {
 121        let slug: String = str
 122            .chars()
 123            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 124            .collect();
 125
 126        slug.trim_matches(|c| c == '-').to_string()
 127    }
 128}
 129
 130pub struct ChannelMembership {
 131    pub user: Arc<User>,
 132    pub kind: proto::channel_member::Kind,
 133    pub role: proto::ChannelRole,
 134}
 135impl ChannelMembership {
 136    pub fn sort_key(&self) -> MembershipSortKey {
 137        MembershipSortKey {
 138            role_order: match self.role {
 139                proto::ChannelRole::Admin => 0,
 140                proto::ChannelRole::Member => 1,
 141                proto::ChannelRole::Banned => 2,
 142                proto::ChannelRole::Talker => 3,
 143                proto::ChannelRole::Guest => 4,
 144            },
 145            kind_order: match self.kind {
 146                proto::channel_member::Kind::Member => 0,
 147                proto::channel_member::Kind::Invitee => 1,
 148            },
 149            username_order: self.user.github_login.as_str(),
 150        }
 151    }
 152}
 153
 154#[derive(PartialOrd, Ord, PartialEq, Eq)]
 155pub struct MembershipSortKey<'a> {
 156    role_order: u8,
 157    kind_order: u8,
 158    username_order: &'a str,
 159}
 160
 161pub enum ChannelEvent {
 162    ChannelCreated(ChannelId),
 163    ChannelRenamed(ChannelId),
 164}
 165
 166impl EventEmitter<ChannelEvent> for ChannelStore {}
 167
 168enum OpenedModelHandle<E> {
 169    Open(WeakModel<E>),
 170    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 171}
 172
 173struct GlobalChannelStore(Model<ChannelStore>);
 174
 175impl Global for GlobalChannelStore {}
 176
 177impl ChannelStore {
 178    pub fn global(cx: &AppContext) -> Model<Self> {
 179        cx.global::<GlobalChannelStore>().0.clone()
 180    }
 181
 182    pub fn new(
 183        client: Arc<Client>,
 184        user_store: Model<UserStore>,
 185        cx: &mut ModelContext<Self>,
 186    ) -> Self {
 187        let rpc_subscriptions = [
 188            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 189            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 190        ];
 191
 192        let mut connection_status = client.status();
 193        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 194        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 195            while let Some(status) = connection_status.next().await {
 196                let this = this.upgrade()?;
 197                match status {
 198                    client::Status::Connected { .. } => {
 199                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 200                            .ok()?
 201                            .await
 202                            .log_err()?;
 203                    }
 204                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 205                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 206                            .ok();
 207                    }
 208                    _ => {
 209                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 210                            .ok();
 211                    }
 212                }
 213            }
 214            Some(())
 215        });
 216
 217        Self {
 218            channel_invitations: Vec::default(),
 219            channel_index: ChannelIndex::default(),
 220            channel_participants: Default::default(),
 221            hosted_projects: Default::default(),
 222            outgoing_invites: Default::default(),
 223            opened_buffers: Default::default(),
 224            opened_chats: Default::default(),
 225            update_channels_tx,
 226            client,
 227            user_store,
 228            _rpc_subscriptions: rpc_subscriptions,
 229            _watch_connection_status: watch_connection_status,
 230            disconnect_channel_buffers_task: None,
 231            _update_channels: cx.spawn(|this, mut cx| async move {
 232                async_maybe!({
 233                    while let Some(update_channels) = update_channels_rx.next().await {
 234                        if let Some(this) = this.upgrade() {
 235                            let update_task = this.update(&mut cx, |this, cx| {
 236                                this.update_channels(update_channels, cx)
 237                            })?;
 238                            if let Some(update_task) = update_task {
 239                                update_task.await.log_err();
 240                            }
 241                        }
 242                    }
 243                    anyhow::Ok(())
 244                })
 245                .await
 246                .log_err();
 247            }),
 248            channel_states: Default::default(),
 249        }
 250    }
 251
 252    pub fn client(&self) -> Arc<Client> {
 253        self.client.clone()
 254    }
 255
 256    /// Returns the number of unique channels in the store
 257    pub fn channel_count(&self) -> usize {
 258        self.channel_index.by_id().len()
 259    }
 260
 261    /// Returns the index of a channel ID in the list of unique channels
 262    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 263        self.channel_index
 264            .by_id()
 265            .keys()
 266            .position(|id| *id == channel_id)
 267    }
 268
 269    /// Returns an iterator over all unique channels
 270    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 271        self.channel_index.by_id().values()
 272    }
 273
 274    /// Iterate over all entries in the channel DAG
 275    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 276        self.channel_index
 277            .ordered_channels()
 278            .iter()
 279            .filter_map(move |id| {
 280                let channel = self.channel_index.by_id().get(id)?;
 281                Some((channel.parent_path.len(), channel))
 282            })
 283    }
 284
 285    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 286        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 287        self.channel_index.by_id().get(channel_id)
 288    }
 289
 290    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 291        self.channel_index.by_id().values().nth(ix)
 292    }
 293
 294    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 295        self.channel_invitations
 296            .iter()
 297            .any(|channel| channel.id == channel_id)
 298    }
 299
 300    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 301        &self.channel_invitations
 302    }
 303
 304    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 305        self.channel_index.by_id().get(&channel_id)
 306    }
 307
 308    pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, HostedProjectId)> {
 309        let mut projects: Vec<(SharedString, HostedProjectId)> = self
 310            .channel_states
 311            .get(&channel_id)
 312            .map(|state| state.projects.clone())
 313            .unwrap_or_default()
 314            .into_iter()
 315            .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
 316            .collect();
 317        projects.sort();
 318        projects
 319    }
 320
 321    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 322        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 323            if let OpenedModelHandle::Open(buffer) = buffer {
 324                return buffer.upgrade().is_some();
 325            }
 326        }
 327        false
 328    }
 329
 330    pub fn open_channel_buffer(
 331        &mut self,
 332        channel_id: ChannelId,
 333        cx: &mut ModelContext<Self>,
 334    ) -> Task<Result<Model<ChannelBuffer>>> {
 335        let client = self.client.clone();
 336        let user_store = self.user_store.clone();
 337        let channel_store = cx.handle();
 338        self.open_channel_resource(
 339            channel_id,
 340            |this| &mut this.opened_buffers,
 341            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 342            cx,
 343        )
 344    }
 345
 346    pub fn fetch_channel_messages(
 347        &self,
 348        message_ids: Vec<u64>,
 349        cx: &mut ModelContext<Self>,
 350    ) -> Task<Result<Vec<ChannelMessage>>> {
 351        let request = if message_ids.is_empty() {
 352            None
 353        } else {
 354            Some(
 355                self.client
 356                    .request(proto::GetChannelMessagesById { message_ids }),
 357            )
 358        };
 359        cx.spawn(|this, mut cx| async move {
 360            if let Some(request) = request {
 361                let response = request.await?;
 362                let this = this
 363                    .upgrade()
 364                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 365                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 366                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 367            } else {
 368                Ok(Vec::new())
 369            }
 370        })
 371    }
 372
 373    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 374        self.channel_states
 375            .get(&channel_id)
 376            .is_some_and(|state| state.has_channel_buffer_changed())
 377    }
 378
 379    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 380        self.channel_states
 381            .get(&channel_id)
 382            .is_some_and(|state| state.has_new_messages())
 383    }
 384
 385    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 386        self.channel_states.get(&channel_id).and_then(|state| {
 387            if let Some(last_message_id) = state.latest_chat_message {
 388                if state
 389                    .last_acknowledged_message_id()
 390                    .is_some_and(|id| id < last_message_id)
 391                {
 392                    return state.last_acknowledged_message_id();
 393                }
 394            }
 395
 396            None
 397        })
 398    }
 399
 400    pub fn acknowledge_message_id(
 401        &mut self,
 402        channel_id: ChannelId,
 403        message_id: u64,
 404        cx: &mut ModelContext<Self>,
 405    ) {
 406        self.channel_states
 407            .entry(channel_id)
 408            .or_insert_with(|| Default::default())
 409            .acknowledge_message_id(message_id);
 410        cx.notify();
 411    }
 412
 413    pub fn update_latest_message_id(
 414        &mut self,
 415        channel_id: ChannelId,
 416        message_id: u64,
 417        cx: &mut ModelContext<Self>,
 418    ) {
 419        self.channel_states
 420            .entry(channel_id)
 421            .or_insert_with(|| Default::default())
 422            .update_latest_message_id(message_id);
 423        cx.notify();
 424    }
 425
 426    pub fn acknowledge_notes_version(
 427        &mut self,
 428        channel_id: ChannelId,
 429        epoch: u64,
 430        version: &clock::Global,
 431        cx: &mut ModelContext<Self>,
 432    ) {
 433        self.channel_states
 434            .entry(channel_id)
 435            .or_insert_with(|| Default::default())
 436            .acknowledge_notes_version(epoch, version);
 437        cx.notify()
 438    }
 439
 440    pub fn update_latest_notes_version(
 441        &mut self,
 442        channel_id: ChannelId,
 443        epoch: u64,
 444        version: &clock::Global,
 445        cx: &mut ModelContext<Self>,
 446    ) {
 447        self.channel_states
 448            .entry(channel_id)
 449            .or_insert_with(|| Default::default())
 450            .update_latest_notes_version(epoch, version);
 451        cx.notify()
 452    }
 453
 454    pub fn open_channel_chat(
 455        &mut self,
 456        channel_id: ChannelId,
 457        cx: &mut ModelContext<Self>,
 458    ) -> Task<Result<Model<ChannelChat>>> {
 459        let client = self.client.clone();
 460        let user_store = self.user_store.clone();
 461        let this = cx.handle();
 462        self.open_channel_resource(
 463            channel_id,
 464            |this| &mut this.opened_chats,
 465            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 466            cx,
 467        )
 468    }
 469
 470    /// Asynchronously open a given resource associated with a channel.
 471    ///
 472    /// Make sure that the resource is only opened once, even if this method
 473    /// is called multiple times with the same channel id while the first task
 474    /// is still running.
 475    fn open_channel_resource<T, F, Fut>(
 476        &mut self,
 477        channel_id: ChannelId,
 478        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 479        load: F,
 480        cx: &mut ModelContext<Self>,
 481    ) -> Task<Result<Model<T>>>
 482    where
 483        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 484        Fut: Future<Output = Result<Model<T>>>,
 485        T: 'static,
 486    {
 487        let task = loop {
 488            match get_map(self).entry(channel_id) {
 489                hash_map::Entry::Occupied(e) => match e.get() {
 490                    OpenedModelHandle::Open(model) => {
 491                        if let Some(model) = model.upgrade() {
 492                            break Task::ready(Ok(model)).shared();
 493                        } else {
 494                            get_map(self).remove(&channel_id);
 495                            continue;
 496                        }
 497                    }
 498                    OpenedModelHandle::Loading(task) => {
 499                        break task.clone();
 500                    }
 501                },
 502                hash_map::Entry::Vacant(e) => {
 503                    let task = cx
 504                        .spawn(move |this, mut cx| async move {
 505                            let channel = this.update(&mut cx, |this, _| {
 506                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 507                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 508                                })
 509                            })??;
 510
 511                            load(channel, cx).await.map_err(Arc::new)
 512                        })
 513                        .shared();
 514
 515                    e.insert(OpenedModelHandle::Loading(task.clone()));
 516                    cx.spawn({
 517                        let task = task.clone();
 518                        move |this, mut cx| async move {
 519                            let result = task.await;
 520                            this.update(&mut cx, |this, _| match result {
 521                                Ok(model) => {
 522                                    get_map(this).insert(
 523                                        channel_id,
 524                                        OpenedModelHandle::Open(model.downgrade()),
 525                                    );
 526                                }
 527                                Err(_) => {
 528                                    get_map(this).remove(&channel_id);
 529                                }
 530                            })
 531                            .ok();
 532                        }
 533                    })
 534                    .detach();
 535                    break task;
 536                }
 537            }
 538        };
 539        cx.background_executor()
 540            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 541    }
 542
 543    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 544        self.channel_role(channel_id) == proto::ChannelRole::Admin
 545    }
 546
 547    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 548        self.channel_index
 549            .by_id()
 550            .get(&channel_id)
 551            .map_or(false, |channel| channel.is_root_channel())
 552    }
 553
 554    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 555        self.channel_index
 556            .by_id()
 557            .get(&channel_id)
 558            .map_or(false, |channel| {
 559                channel.visibility == ChannelVisibility::Public
 560            })
 561    }
 562
 563    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 564        match self.channel_role(channel_id) {
 565            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 566            _ => Capability::ReadOnly,
 567        }
 568    }
 569
 570    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 571        maybe!({
 572            let mut channel = self.channel_for_id(channel_id)?;
 573            if !channel.is_root_channel() {
 574                channel = self.channel_for_id(channel.root_id())?;
 575            }
 576            let root_channel_state = self.channel_states.get(&channel.id);
 577            root_channel_state?.role
 578        })
 579        .unwrap_or(proto::ChannelRole::Guest)
 580    }
 581
 582    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 583        self.channel_participants
 584            .get(&channel_id)
 585            .map_or(&[], |v| v.as_slice())
 586    }
 587
 588    pub fn create_channel(
 589        &self,
 590        name: &str,
 591        parent_id: Option<ChannelId>,
 592        cx: &mut ModelContext<Self>,
 593    ) -> Task<Result<ChannelId>> {
 594        let client = self.client.clone();
 595        let name = name.trim_start_matches('#').to_owned();
 596        cx.spawn(move |this, mut cx| async move {
 597            let response = client
 598                .request(proto::CreateChannel {
 599                    name,
 600                    parent_id: parent_id.map(|cid| cid.0),
 601                })
 602                .await?;
 603
 604            let channel = response
 605                .channel
 606                .ok_or_else(|| anyhow!("missing channel in response"))?;
 607            let channel_id = ChannelId(channel.id);
 608
 609            this.update(&mut cx, |this, cx| {
 610                let task = this.update_channels(
 611                    proto::UpdateChannels {
 612                        channels: vec![channel],
 613                        ..Default::default()
 614                    },
 615                    cx,
 616                );
 617                assert!(task.is_none());
 618
 619                // This event is emitted because the collab panel wants to clear the pending edit state
 620                // before this frame is rendered. But we can't guarantee that the collab panel's future
 621                // will resolve before this flush_effects finishes. Synchronously emitting this event
 622                // ensures that the collab panel will observe this creation before the frame completes
 623                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 624            })?;
 625
 626            Ok(channel_id)
 627        })
 628    }
 629
 630    pub fn move_channel(
 631        &mut self,
 632        channel_id: ChannelId,
 633        to: ChannelId,
 634        cx: &mut ModelContext<Self>,
 635    ) -> Task<Result<()>> {
 636        let client = self.client.clone();
 637        cx.spawn(move |_, _| async move {
 638            let _ = client
 639                .request(proto::MoveChannel {
 640                    channel_id: channel_id.0,
 641                    to: to.0,
 642                })
 643                .await?;
 644
 645            Ok(())
 646        })
 647    }
 648
 649    pub fn set_channel_visibility(
 650        &mut self,
 651        channel_id: ChannelId,
 652        visibility: ChannelVisibility,
 653        cx: &mut ModelContext<Self>,
 654    ) -> Task<Result<()>> {
 655        let client = self.client.clone();
 656        cx.spawn(move |_, _| async move {
 657            let _ = client
 658                .request(proto::SetChannelVisibility {
 659                    channel_id: channel_id.0,
 660                    visibility: visibility.into(),
 661                })
 662                .await?;
 663
 664            Ok(())
 665        })
 666    }
 667
 668    pub fn invite_member(
 669        &mut self,
 670        channel_id: ChannelId,
 671        user_id: UserId,
 672        role: proto::ChannelRole,
 673        cx: &mut ModelContext<Self>,
 674    ) -> Task<Result<()>> {
 675        if !self.outgoing_invites.insert((channel_id, user_id)) {
 676            return Task::ready(Err(anyhow!("invite request already in progress")));
 677        }
 678
 679        cx.notify();
 680        let client = self.client.clone();
 681        cx.spawn(move |this, mut cx| async move {
 682            let result = client
 683                .request(proto::InviteChannelMember {
 684                    channel_id: channel_id.0,
 685                    user_id,
 686                    role: role.into(),
 687                })
 688                .await;
 689
 690            this.update(&mut cx, |this, cx| {
 691                this.outgoing_invites.remove(&(channel_id, user_id));
 692                cx.notify();
 693            })?;
 694
 695            result?;
 696
 697            Ok(())
 698        })
 699    }
 700
 701    pub fn remove_member(
 702        &mut self,
 703        channel_id: ChannelId,
 704        user_id: u64,
 705        cx: &mut ModelContext<Self>,
 706    ) -> Task<Result<()>> {
 707        if !self.outgoing_invites.insert((channel_id, user_id)) {
 708            return Task::ready(Err(anyhow!("invite request already in progress")));
 709        }
 710
 711        cx.notify();
 712        let client = self.client.clone();
 713        cx.spawn(move |this, mut cx| async move {
 714            let result = client
 715                .request(proto::RemoveChannelMember {
 716                    channel_id: channel_id.0,
 717                    user_id,
 718                })
 719                .await;
 720
 721            this.update(&mut cx, |this, cx| {
 722                this.outgoing_invites.remove(&(channel_id, user_id));
 723                cx.notify();
 724            })?;
 725            result?;
 726            Ok(())
 727        })
 728    }
 729
 730    pub fn set_member_role(
 731        &mut self,
 732        channel_id: ChannelId,
 733        user_id: UserId,
 734        role: proto::ChannelRole,
 735        cx: &mut ModelContext<Self>,
 736    ) -> Task<Result<()>> {
 737        if !self.outgoing_invites.insert((channel_id, user_id)) {
 738            return Task::ready(Err(anyhow!("member request already in progress")));
 739        }
 740
 741        cx.notify();
 742        let client = self.client.clone();
 743        cx.spawn(move |this, mut cx| async move {
 744            let result = client
 745                .request(proto::SetChannelMemberRole {
 746                    channel_id: channel_id.0,
 747                    user_id,
 748                    role: role.into(),
 749                })
 750                .await;
 751
 752            this.update(&mut cx, |this, cx| {
 753                this.outgoing_invites.remove(&(channel_id, user_id));
 754                cx.notify();
 755            })?;
 756
 757            result?;
 758            Ok(())
 759        })
 760    }
 761
 762    pub fn rename(
 763        &mut self,
 764        channel_id: ChannelId,
 765        new_name: &str,
 766        cx: &mut ModelContext<Self>,
 767    ) -> Task<Result<()>> {
 768        let client = self.client.clone();
 769        let name = new_name.to_string();
 770        cx.spawn(move |this, mut cx| async move {
 771            let channel = client
 772                .request(proto::RenameChannel {
 773                    channel_id: channel_id.0,
 774                    name,
 775                })
 776                .await?
 777                .channel
 778                .ok_or_else(|| anyhow!("missing channel in response"))?;
 779            this.update(&mut cx, |this, cx| {
 780                let task = this.update_channels(
 781                    proto::UpdateChannels {
 782                        channels: vec![channel],
 783                        ..Default::default()
 784                    },
 785                    cx,
 786                );
 787                assert!(task.is_none());
 788
 789                // This event is emitted because the collab panel wants to clear the pending edit state
 790                // before this frame is rendered. But we can't guarantee that the collab panel's future
 791                // will resolve before this flush_effects finishes. Synchronously emitting this event
 792                // ensures that the collab panel will observe this creation before the frame complete
 793                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 794            })?;
 795            Ok(())
 796        })
 797    }
 798
 799    pub fn respond_to_channel_invite(
 800        &mut self,
 801        channel_id: ChannelId,
 802        accept: bool,
 803        cx: &mut ModelContext<Self>,
 804    ) -> Task<Result<()>> {
 805        let client = self.client.clone();
 806        cx.background_executor().spawn(async move {
 807            client
 808                .request(proto::RespondToChannelInvite {
 809                    channel_id: channel_id.0,
 810                    accept,
 811                })
 812                .await?;
 813            Ok(())
 814        })
 815    }
 816
 817    pub fn get_channel_member_details(
 818        &self,
 819        channel_id: ChannelId,
 820        cx: &mut ModelContext<Self>,
 821    ) -> Task<Result<Vec<ChannelMembership>>> {
 822        let client = self.client.clone();
 823        let user_store = self.user_store.downgrade();
 824        cx.spawn(move |_, mut cx| async move {
 825            let response = client
 826                .request(proto::GetChannelMembers {
 827                    channel_id: channel_id.0,
 828                })
 829                .await?;
 830
 831            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 832            let user_store = user_store
 833                .upgrade()
 834                .ok_or_else(|| anyhow!("user store dropped"))?;
 835            let users = user_store
 836                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 837                .await?;
 838
 839            Ok(users
 840                .into_iter()
 841                .zip(response.members)
 842                .map(|(user, member)| ChannelMembership {
 843                    user,
 844                    role: member.role(),
 845                    kind: member.kind(),
 846                })
 847                .collect())
 848        })
 849    }
 850
 851    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 852        let client = self.client.clone();
 853        async move {
 854            client
 855                .request(proto::DeleteChannel {
 856                    channel_id: channel_id.0,
 857                })
 858                .await?;
 859            Ok(())
 860        }
 861    }
 862
 863    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 864        false
 865    }
 866
 867    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 868        self.outgoing_invites.contains(&(channel_id, user_id))
 869    }
 870
 871    async fn handle_update_channels(
 872        this: Model<Self>,
 873        message: TypedEnvelope<proto::UpdateChannels>,
 874        _: Arc<Client>,
 875        mut cx: AsyncAppContext,
 876    ) -> Result<()> {
 877        this.update(&mut cx, |this, _| {
 878            this.update_channels_tx
 879                .unbounded_send(message.payload)
 880                .unwrap();
 881        })?;
 882        Ok(())
 883    }
 884
 885    async fn handle_update_user_channels(
 886        this: Model<Self>,
 887        message: TypedEnvelope<proto::UpdateUserChannels>,
 888        _: Arc<Client>,
 889        mut cx: AsyncAppContext,
 890    ) -> Result<()> {
 891        this.update(&mut cx, |this, cx| {
 892            for buffer_version in message.payload.observed_channel_buffer_version {
 893                let version = language::proto::deserialize_version(&buffer_version.version);
 894                this.acknowledge_notes_version(
 895                    ChannelId(buffer_version.channel_id),
 896                    buffer_version.epoch,
 897                    &version,
 898                    cx,
 899                );
 900            }
 901            for message_id in message.payload.observed_channel_message_id {
 902                this.acknowledge_message_id(
 903                    ChannelId(message_id.channel_id),
 904                    message_id.message_id,
 905                    cx,
 906                );
 907            }
 908            for membership in message.payload.channel_memberships {
 909                if let Some(role) = ChannelRole::from_i32(membership.role) {
 910                    this.channel_states
 911                        .entry(ChannelId(membership.channel_id))
 912                        .or_insert_with(|| ChannelState::default())
 913                        .set_role(role)
 914                }
 915            }
 916        })
 917    }
 918
 919    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 920        self.channel_index.clear();
 921        self.channel_invitations.clear();
 922        self.channel_participants.clear();
 923        self.channel_index.clear();
 924        self.outgoing_invites.clear();
 925        self.disconnect_channel_buffers_task.take();
 926
 927        for chat in self.opened_chats.values() {
 928            if let OpenedModelHandle::Open(chat) = chat {
 929                if let Some(chat) = chat.upgrade() {
 930                    chat.update(cx, |chat, cx| {
 931                        chat.rejoin(cx);
 932                    });
 933                }
 934            }
 935        }
 936
 937        let mut buffer_versions = Vec::new();
 938        for buffer in self.opened_buffers.values() {
 939            if let OpenedModelHandle::Open(buffer) = buffer {
 940                if let Some(buffer) = buffer.upgrade() {
 941                    let channel_buffer = buffer.read(cx);
 942                    let buffer = channel_buffer.buffer().read(cx);
 943                    buffer_versions.push(proto::ChannelBufferVersion {
 944                        channel_id: channel_buffer.channel_id.0,
 945                        epoch: channel_buffer.epoch(),
 946                        version: language::proto::serialize_version(&buffer.version()),
 947                    });
 948                }
 949            }
 950        }
 951
 952        if buffer_versions.is_empty() {
 953            return Task::ready(Ok(()));
 954        }
 955
 956        let response = self.client.request(proto::RejoinChannelBuffers {
 957            buffers: buffer_versions,
 958        });
 959
 960        cx.spawn(|this, mut cx| async move {
 961            let mut response = response.await?;
 962
 963            this.update(&mut cx, |this, cx| {
 964                this.opened_buffers.retain(|_, buffer| match buffer {
 965                    OpenedModelHandle::Open(channel_buffer) => {
 966                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 967                            return false;
 968                        };
 969
 970                        channel_buffer.update(cx, |channel_buffer, cx| {
 971                            let channel_id = channel_buffer.channel_id;
 972                            if let Some(remote_buffer) = response
 973                                .buffers
 974                                .iter_mut()
 975                                .find(|buffer| buffer.channel_id == channel_id.0)
 976                            {
 977                                let channel_id = channel_buffer.channel_id;
 978                                let remote_version =
 979                                    language::proto::deserialize_version(&remote_buffer.version);
 980
 981                                channel_buffer.replace_collaborators(
 982                                    mem::take(&mut remote_buffer.collaborators),
 983                                    cx,
 984                                );
 985
 986                                let operations = channel_buffer
 987                                    .buffer()
 988                                    .update(cx, |buffer, cx| {
 989                                        let outgoing_operations =
 990                                            buffer.serialize_ops(Some(remote_version), cx);
 991                                        let incoming_operations =
 992                                            mem::take(&mut remote_buffer.operations)
 993                                                .into_iter()
 994                                                .map(language::proto::deserialize_operation)
 995                                                .collect::<Result<Vec<_>>>()?;
 996                                        buffer.apply_ops(incoming_operations, cx)?;
 997                                        anyhow::Ok(outgoing_operations)
 998                                    })
 999                                    .log_err();
1000
1001                                if let Some(operations) = operations {
1002                                    let client = this.client.clone();
1003                                    cx.background_executor()
1004                                        .spawn(async move {
1005                                            let operations = operations.await;
1006                                            for chunk in
1007                                                language::proto::split_operations(operations)
1008                                            {
1009                                                client
1010                                                    .send(proto::UpdateChannelBuffer {
1011                                                        channel_id: channel_id.0,
1012                                                        operations: chunk,
1013                                                    })
1014                                                    .ok();
1015                                            }
1016                                        })
1017                                        .detach();
1018                                    return true;
1019                                }
1020                            }
1021
1022                            channel_buffer.disconnect(cx);
1023                            false
1024                        })
1025                    }
1026                    OpenedModelHandle::Loading(_) => true,
1027                });
1028            })
1029            .ok();
1030            anyhow::Ok(())
1031        })
1032    }
1033
1034    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1035        cx.notify();
1036
1037        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1038            cx.spawn(move |this, mut cx| async move {
1039                if wait_for_reconnect {
1040                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1041                }
1042
1043                if let Some(this) = this.upgrade() {
1044                    this.update(&mut cx, |this, cx| {
1045                        for (_, buffer) in this.opened_buffers.drain() {
1046                            if let OpenedModelHandle::Open(buffer) = buffer {
1047                                if let Some(buffer) = buffer.upgrade() {
1048                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1049                                }
1050                            }
1051                        }
1052                    })
1053                    .ok();
1054                }
1055            })
1056        });
1057    }
1058
1059    pub(crate) fn update_channels(
1060        &mut self,
1061        payload: proto::UpdateChannels,
1062        cx: &mut ModelContext<ChannelStore>,
1063    ) -> Option<Task<Result<()>>> {
1064        if !payload.remove_channel_invitations.is_empty() {
1065            self.channel_invitations
1066                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1067        }
1068        for channel in payload.channel_invitations {
1069            match self
1070                .channel_invitations
1071                .binary_search_by_key(&channel.id, |c| c.id.0)
1072            {
1073                Ok(ix) => {
1074                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1075                }
1076                Err(ix) => self.channel_invitations.insert(
1077                    ix,
1078                    Arc::new(Channel {
1079                        id: ChannelId(channel.id),
1080                        visibility: channel.visibility(),
1081                        name: channel.name.into(),
1082                        parent_path: channel
1083                            .parent_path
1084                            .into_iter()
1085                            .map(|cid| ChannelId(cid))
1086                            .collect(),
1087                    }),
1088                ),
1089            }
1090        }
1091
1092        let channels_changed = !payload.channels.is_empty()
1093            || !payload.delete_channels.is_empty()
1094            || !payload.latest_channel_message_ids.is_empty()
1095            || !payload.latest_channel_buffer_versions.is_empty()
1096            || !payload.hosted_projects.is_empty()
1097            || !payload.deleted_hosted_projects.is_empty();
1098
1099        if channels_changed {
1100            if !payload.delete_channels.is_empty() {
1101                let delete_channels: Vec<ChannelId> = payload
1102                    .delete_channels
1103                    .into_iter()
1104                    .map(|cid| ChannelId(cid))
1105                    .collect();
1106                self.channel_index.delete_channels(&delete_channels);
1107                self.channel_participants
1108                    .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1109
1110                for channel_id in &delete_channels {
1111                    let channel_id = *channel_id;
1112                    if payload
1113                        .channels
1114                        .iter()
1115                        .any(|channel| channel.id == channel_id.0)
1116                    {
1117                        continue;
1118                    }
1119                    if let Some(OpenedModelHandle::Open(buffer)) =
1120                        self.opened_buffers.remove(&channel_id)
1121                    {
1122                        if let Some(buffer) = buffer.upgrade() {
1123                            buffer.update(cx, ChannelBuffer::disconnect);
1124                        }
1125                    }
1126                }
1127            }
1128
1129            let mut index = self.channel_index.bulk_insert();
1130            for channel in payload.channels {
1131                let id = ChannelId(channel.id);
1132                let channel_changed = index.insert(channel);
1133
1134                if channel_changed {
1135                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1136                        if let Some(buffer) = buffer.upgrade() {
1137                            buffer.update(cx, ChannelBuffer::channel_changed);
1138                        }
1139                    }
1140                }
1141            }
1142
1143            for latest_buffer_version in payload.latest_channel_buffer_versions {
1144                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1145                self.channel_states
1146                    .entry(ChannelId(latest_buffer_version.channel_id))
1147                    .or_default()
1148                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1149            }
1150
1151            for latest_channel_message in payload.latest_channel_message_ids {
1152                self.channel_states
1153                    .entry(ChannelId(latest_channel_message.channel_id))
1154                    .or_default()
1155                    .update_latest_message_id(latest_channel_message.message_id);
1156            }
1157
1158            for hosted_project in payload.hosted_projects {
1159                let hosted_project: HostedProject = hosted_project.into();
1160                if let Some(old_project) = self
1161                    .hosted_projects
1162                    .insert(hosted_project.id, hosted_project.clone())
1163                {
1164                    self.channel_states
1165                        .entry(old_project.channel_id)
1166                        .or_default()
1167                        .remove_hosted_project(old_project.id);
1168                }
1169                self.channel_states
1170                    .entry(hosted_project.channel_id)
1171                    .or_default()
1172                    .add_hosted_project(hosted_project.id);
1173            }
1174
1175            for hosted_project_id in payload.deleted_hosted_projects {
1176                let hosted_project_id = HostedProjectId(hosted_project_id);
1177
1178                if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1179                    self.channel_states
1180                        .entry(old_project.channel_id)
1181                        .or_default()
1182                        .remove_hosted_project(old_project.id);
1183                }
1184            }
1185        }
1186
1187        cx.notify();
1188        if payload.channel_participants.is_empty() {
1189            return None;
1190        }
1191
1192        let mut all_user_ids = Vec::new();
1193        let channel_participants = payload.channel_participants;
1194        for entry in &channel_participants {
1195            for user_id in entry.participant_user_ids.iter() {
1196                if let Err(ix) = all_user_ids.binary_search(user_id) {
1197                    all_user_ids.insert(ix, *user_id);
1198                }
1199            }
1200        }
1201
1202        let users = self
1203            .user_store
1204            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1205        Some(cx.spawn(|this, mut cx| async move {
1206            let users = users.await?;
1207
1208            this.update(&mut cx, |this, cx| {
1209                for entry in &channel_participants {
1210                    let mut participants: Vec<_> = entry
1211                        .participant_user_ids
1212                        .iter()
1213                        .filter_map(|user_id| {
1214                            users
1215                                .binary_search_by_key(&user_id, |user| &user.id)
1216                                .ok()
1217                                .map(|ix| users[ix].clone())
1218                        })
1219                        .collect();
1220
1221                    participants.sort_by_key(|u| u.id);
1222
1223                    this.channel_participants
1224                        .insert(ChannelId(entry.channel_id), participants);
1225                }
1226
1227                cx.notify();
1228            })
1229        }))
1230    }
1231}
1232
1233impl ChannelState {
1234    fn set_role(&mut self, role: ChannelRole) {
1235        self.role = Some(role);
1236    }
1237
1238    fn has_channel_buffer_changed(&self) -> bool {
1239        if let Some(latest_version) = &self.latest_notes_versions {
1240            if let Some(observed_version) = &self.observed_notes_versions {
1241                latest_version.epoch > observed_version.epoch
1242                    || (latest_version.epoch == observed_version.epoch
1243                        && latest_version
1244                            .version
1245                            .changed_since(&observed_version.version))
1246            } else {
1247                true
1248            }
1249        } else {
1250            false
1251        }
1252    }
1253
1254    fn has_new_messages(&self) -> bool {
1255        let latest_message_id = self.latest_chat_message;
1256        let observed_message_id = self.observed_chat_message;
1257
1258        latest_message_id.is_some_and(|latest_message_id| {
1259            latest_message_id > observed_message_id.unwrap_or_default()
1260        })
1261    }
1262
1263    fn last_acknowledged_message_id(&self) -> Option<u64> {
1264        self.observed_chat_message
1265    }
1266
1267    fn acknowledge_message_id(&mut self, message_id: u64) {
1268        let observed = self.observed_chat_message.get_or_insert(message_id);
1269        *observed = (*observed).max(message_id);
1270    }
1271
1272    fn update_latest_message_id(&mut self, message_id: u64) {
1273        self.latest_chat_message =
1274            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1275    }
1276
1277    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1278        if let Some(existing) = &mut self.observed_notes_versions {
1279            if existing.epoch == epoch {
1280                existing.version.join(version);
1281                return;
1282            }
1283        }
1284        self.observed_notes_versions = Some(NotesVersion {
1285            epoch,
1286            version: version.clone(),
1287        });
1288    }
1289
1290    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1291        if let Some(existing) = &mut self.latest_notes_versions {
1292            if existing.epoch == epoch {
1293                existing.version.join(version);
1294                return;
1295            }
1296        }
1297        self.latest_notes_versions = Some(NotesVersion {
1298            epoch,
1299            version: version.clone(),
1300        });
1301    }
1302
1303    fn add_hosted_project(&mut self, project_id: HostedProjectId) {
1304        self.projects.insert(project_id);
1305    }
1306
1307    fn remove_hosted_project(&mut self, project_id: HostedProjectId) {
1308        self.projects.remove(&project_id);
1309    }
1310}