channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{
   7    ChannelId, Client, ClientSettings, DevServerId, ProjectId, RemoteProjectId, Subscription, User,
   8    UserId, UserStore,
   9};
  10use collections::{hash_map, HashMap, HashSet};
  11use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  12use gpui::{
  13    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  14    Task, WeakModel,
  15};
  16use language::Capability;
  17use rpc::{
  18    proto::{self, ChannelRole, ChannelVisibility, DevServerStatus},
  19    TypedEnvelope,
  20};
  21use settings::Settings;
  22use std::{mem, sync::Arc, time::Duration};
  23use util::{maybe, ResultExt};
  24
  25pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  26
  27pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  28    let channel_store =
  29        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  30    cx.set_global(GlobalChannelStore(channel_store));
  31}
  32
  33#[derive(Debug, Clone, Default, PartialEq)]
  34struct NotesVersion {
  35    epoch: u64,
  36    version: clock::Global,
  37}
  38
  39#[derive(Debug, Clone)]
  40pub struct HostedProject {
  41    project_id: ProjectId,
  42    channel_id: ChannelId,
  43    name: SharedString,
  44    _visibility: proto::ChannelVisibility,
  45}
  46impl From<proto::HostedProject> for HostedProject {
  47    fn from(project: proto::HostedProject) -> Self {
  48        Self {
  49            project_id: ProjectId(project.project_id),
  50            channel_id: ChannelId(project.channel_id),
  51            _visibility: project.visibility(),
  52            name: project.name.into(),
  53        }
  54    }
  55}
  56
  57#[derive(Debug, Clone)]
  58pub struct RemoteProject {
  59    pub id: RemoteProjectId,
  60    pub project_id: Option<ProjectId>,
  61    pub channel_id: ChannelId,
  62    pub name: SharedString,
  63    pub path: SharedString,
  64    pub dev_server_id: DevServerId,
  65}
  66
  67impl From<proto::RemoteProject> for RemoteProject {
  68    fn from(project: proto::RemoteProject) -> Self {
  69        Self {
  70            id: RemoteProjectId(project.id),
  71            project_id: project.project_id.map(|id| ProjectId(id)),
  72            channel_id: ChannelId(project.channel_id),
  73            name: project.name.into(),
  74            path: project.path.into(),
  75            dev_server_id: DevServerId(project.dev_server_id),
  76        }
  77    }
  78}
  79
  80#[derive(Debug, Clone)]
  81pub struct DevServer {
  82    pub id: DevServerId,
  83    pub channel_id: ChannelId,
  84    pub name: SharedString,
  85    pub status: DevServerStatus,
  86}
  87
  88impl From<proto::DevServer> for DevServer {
  89    fn from(dev_server: proto::DevServer) -> Self {
  90        Self {
  91            id: DevServerId(dev_server.dev_server_id),
  92            channel_id: ChannelId(dev_server.channel_id),
  93            status: dev_server.status(),
  94            name: dev_server.name.into(),
  95        }
  96    }
  97}
  98
  99pub struct ChannelStore {
 100    pub channel_index: ChannelIndex,
 101    channel_invitations: Vec<Arc<Channel>>,
 102    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
 103    channel_states: HashMap<ChannelId, ChannelState>,
 104    hosted_projects: HashMap<ProjectId, HostedProject>,
 105    remote_projects: HashMap<RemoteProjectId, RemoteProject>,
 106    dev_servers: HashMap<DevServerId, DevServer>,
 107
 108    outgoing_invites: HashSet<(ChannelId, UserId)>,
 109    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
 110    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
 111    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
 112    client: Arc<Client>,
 113    user_store: Model<UserStore>,
 114    _rpc_subscriptions: [Subscription; 2],
 115    _watch_connection_status: Task<Option<()>>,
 116    disconnect_channel_buffers_task: Option<Task<()>>,
 117    _update_channels: Task<()>,
 118}
 119
 120#[derive(Clone, Debug)]
 121pub struct Channel {
 122    pub id: ChannelId,
 123    pub name: SharedString,
 124    pub visibility: proto::ChannelVisibility,
 125    pub parent_path: Vec<ChannelId>,
 126}
 127
 128#[derive(Default, Debug)]
 129pub struct ChannelState {
 130    latest_chat_message: Option<u64>,
 131    latest_notes_version: NotesVersion,
 132    observed_notes_version: NotesVersion,
 133    observed_chat_message: Option<u64>,
 134    role: Option<ChannelRole>,
 135    projects: HashSet<ProjectId>,
 136    dev_servers: HashSet<DevServerId>,
 137    remote_projects: HashSet<RemoteProjectId>,
 138}
 139
 140impl Channel {
 141    pub fn link(&self, cx: &AppContext) -> String {
 142        format!(
 143            "{}/channel/{}-{}",
 144            ClientSettings::get_global(cx).server_url,
 145            Self::slug(&self.name),
 146            self.id
 147        )
 148    }
 149
 150    pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
 151        self.link(cx)
 152            + "/notes"
 153            + &heading
 154                .map(|h| format!("#{}", Self::slug(&h)))
 155                .unwrap_or_default()
 156    }
 157
 158    pub fn is_root_channel(&self) -> bool {
 159        self.parent_path.is_empty()
 160    }
 161
 162    pub fn root_id(&self) -> ChannelId {
 163        self.parent_path.first().copied().unwrap_or(self.id)
 164    }
 165
 166    pub fn slug(str: &str) -> String {
 167        let slug: String = str
 168            .chars()
 169            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 170            .collect();
 171
 172        slug.trim_matches(|c| c == '-').to_string()
 173    }
 174}
 175
 176pub struct ChannelMembership {
 177    pub user: Arc<User>,
 178    pub kind: proto::channel_member::Kind,
 179    pub role: proto::ChannelRole,
 180}
 181impl ChannelMembership {
 182    pub fn sort_key(&self) -> MembershipSortKey {
 183        MembershipSortKey {
 184            role_order: match self.role {
 185                proto::ChannelRole::Admin => 0,
 186                proto::ChannelRole::Member => 1,
 187                proto::ChannelRole::Banned => 2,
 188                proto::ChannelRole::Talker => 3,
 189                proto::ChannelRole::Guest => 4,
 190            },
 191            kind_order: match self.kind {
 192                proto::channel_member::Kind::Member => 0,
 193                proto::channel_member::Kind::Invitee => 1,
 194            },
 195            username_order: self.user.github_login.as_str(),
 196        }
 197    }
 198}
 199
 200#[derive(PartialOrd, Ord, PartialEq, Eq)]
 201pub struct MembershipSortKey<'a> {
 202    role_order: u8,
 203    kind_order: u8,
 204    username_order: &'a str,
 205}
 206
 207pub enum ChannelEvent {
 208    ChannelCreated(ChannelId),
 209    ChannelRenamed(ChannelId),
 210}
 211
 212impl EventEmitter<ChannelEvent> for ChannelStore {}
 213
 214enum OpenedModelHandle<E> {
 215    Open(WeakModel<E>),
 216    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 217}
 218
 219struct GlobalChannelStore(Model<ChannelStore>);
 220
 221impl Global for GlobalChannelStore {}
 222
 223impl ChannelStore {
 224    pub fn global(cx: &AppContext) -> Model<Self> {
 225        cx.global::<GlobalChannelStore>().0.clone()
 226    }
 227
 228    pub fn new(
 229        client: Arc<Client>,
 230        user_store: Model<UserStore>,
 231        cx: &mut ModelContext<Self>,
 232    ) -> Self {
 233        let rpc_subscriptions = [
 234            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 235            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 236        ];
 237
 238        let mut connection_status = client.status();
 239        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 240        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 241            while let Some(status) = connection_status.next().await {
 242                let this = this.upgrade()?;
 243                match status {
 244                    client::Status::Connected { .. } => {
 245                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 246                            .ok()?
 247                            .await
 248                            .log_err()?;
 249                    }
 250                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 251                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 252                            .ok();
 253                    }
 254                    _ => {
 255                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 256                            .ok();
 257                    }
 258                }
 259            }
 260            Some(())
 261        });
 262
 263        Self {
 264            channel_invitations: Vec::default(),
 265            channel_index: ChannelIndex::default(),
 266            channel_participants: Default::default(),
 267            hosted_projects: Default::default(),
 268            remote_projects: Default::default(),
 269            dev_servers: Default::default(),
 270            outgoing_invites: Default::default(),
 271            opened_buffers: Default::default(),
 272            opened_chats: Default::default(),
 273            update_channels_tx,
 274            client,
 275            user_store,
 276            _rpc_subscriptions: rpc_subscriptions,
 277            _watch_connection_status: watch_connection_status,
 278            disconnect_channel_buffers_task: None,
 279            _update_channels: cx.spawn(|this, mut cx| async move {
 280                maybe!(async move {
 281                    while let Some(update_channels) = update_channels_rx.next().await {
 282                        if let Some(this) = this.upgrade() {
 283                            let update_task = this.update(&mut cx, |this, cx| {
 284                                this.update_channels(update_channels, cx)
 285                            })?;
 286                            if let Some(update_task) = update_task {
 287                                update_task.await.log_err();
 288                            }
 289                        }
 290                    }
 291                    anyhow::Ok(())
 292                })
 293                .await
 294                .log_err();
 295            }),
 296            channel_states: Default::default(),
 297        }
 298    }
 299
 300    pub fn client(&self) -> Arc<Client> {
 301        self.client.clone()
 302    }
 303
 304    /// Returns the number of unique channels in the store
 305    pub fn channel_count(&self) -> usize {
 306        self.channel_index.by_id().len()
 307    }
 308
 309    /// Returns the index of a channel ID in the list of unique channels
 310    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 311        self.channel_index
 312            .by_id()
 313            .keys()
 314            .position(|id| *id == channel_id)
 315    }
 316
 317    /// Returns an iterator over all unique channels
 318    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 319        self.channel_index.by_id().values()
 320    }
 321
 322    /// Iterate over all entries in the channel DAG
 323    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 324        self.channel_index
 325            .ordered_channels()
 326            .iter()
 327            .filter_map(move |id| {
 328                let channel = self.channel_index.by_id().get(id)?;
 329                Some((channel.parent_path.len(), channel))
 330            })
 331    }
 332
 333    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 334        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 335        self.channel_index.by_id().get(channel_id)
 336    }
 337
 338    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 339        self.channel_index.by_id().values().nth(ix)
 340    }
 341
 342    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 343        self.channel_invitations
 344            .iter()
 345            .any(|channel| channel.id == channel_id)
 346    }
 347
 348    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 349        &self.channel_invitations
 350    }
 351
 352    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 353        self.channel_index.by_id().get(&channel_id)
 354    }
 355
 356    pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
 357        let mut projects: Vec<(SharedString, ProjectId)> = self
 358            .channel_states
 359            .get(&channel_id)
 360            .map(|state| state.projects.clone())
 361            .unwrap_or_default()
 362            .into_iter()
 363            .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
 364            .collect();
 365        projects.sort();
 366        projects
 367    }
 368
 369    pub fn dev_servers_for_id(&self, channel_id: ChannelId) -> Vec<DevServer> {
 370        let mut dev_servers: Vec<DevServer> = self
 371            .channel_states
 372            .get(&channel_id)
 373            .map(|state| state.dev_servers.clone())
 374            .unwrap_or_default()
 375            .into_iter()
 376            .flat_map(|id| self.dev_servers.get(&id).cloned())
 377            .collect();
 378        dev_servers.sort_by_key(|s| (s.name.clone(), s.id));
 379        dev_servers
 380    }
 381
 382    pub fn find_dev_server_by_id(&self, id: DevServerId) -> Option<&DevServer> {
 383        self.dev_servers.get(&id)
 384    }
 385
 386    pub fn find_remote_project_by_id(&self, id: RemoteProjectId) -> Option<&RemoteProject> {
 387        self.remote_projects.get(&id)
 388    }
 389
 390    pub fn remote_projects_for_id(&self, channel_id: ChannelId) -> Vec<RemoteProject> {
 391        let mut remote_projects: Vec<RemoteProject> = self
 392            .channel_states
 393            .get(&channel_id)
 394            .map(|state| state.remote_projects.clone())
 395            .unwrap_or_default()
 396            .into_iter()
 397            .flat_map(|id| self.remote_projects.get(&id).cloned())
 398            .collect();
 399        remote_projects.sort_by_key(|p| (p.name.clone(), p.id));
 400        remote_projects
 401    }
 402
 403    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 404        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 405            if let OpenedModelHandle::Open(buffer) = buffer {
 406                return buffer.upgrade().is_some();
 407            }
 408        }
 409        false
 410    }
 411
 412    pub fn open_channel_buffer(
 413        &mut self,
 414        channel_id: ChannelId,
 415        cx: &mut ModelContext<Self>,
 416    ) -> Task<Result<Model<ChannelBuffer>>> {
 417        let client = self.client.clone();
 418        let user_store = self.user_store.clone();
 419        let channel_store = cx.handle();
 420        self.open_channel_resource(
 421            channel_id,
 422            |this| &mut this.opened_buffers,
 423            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 424            cx,
 425        )
 426    }
 427
 428    pub fn fetch_channel_messages(
 429        &self,
 430        message_ids: Vec<u64>,
 431        cx: &mut ModelContext<Self>,
 432    ) -> Task<Result<Vec<ChannelMessage>>> {
 433        let request = if message_ids.is_empty() {
 434            None
 435        } else {
 436            Some(
 437                self.client
 438                    .request(proto::GetChannelMessagesById { message_ids }),
 439            )
 440        };
 441        cx.spawn(|this, mut cx| async move {
 442            if let Some(request) = request {
 443                let response = request.await?;
 444                let this = this
 445                    .upgrade()
 446                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 447                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 448                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 449            } else {
 450                Ok(Vec::new())
 451            }
 452        })
 453    }
 454
 455    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 456        self.channel_states
 457            .get(&channel_id)
 458            .is_some_and(|state| state.has_channel_buffer_changed())
 459    }
 460
 461    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 462        self.channel_states
 463            .get(&channel_id)
 464            .is_some_and(|state| state.has_new_messages())
 465    }
 466
 467    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 468        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 469            state.latest_chat_message = message_id;
 470        }
 471    }
 472
 473    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 474        self.channel_states.get(&channel_id).and_then(|state| {
 475            if let Some(last_message_id) = state.latest_chat_message {
 476                if state
 477                    .last_acknowledged_message_id()
 478                    .is_some_and(|id| id < last_message_id)
 479                {
 480                    return state.last_acknowledged_message_id();
 481                }
 482            }
 483
 484            None
 485        })
 486    }
 487
 488    pub fn acknowledge_message_id(
 489        &mut self,
 490        channel_id: ChannelId,
 491        message_id: u64,
 492        cx: &mut ModelContext<Self>,
 493    ) {
 494        self.channel_states
 495            .entry(channel_id)
 496            .or_insert_with(|| Default::default())
 497            .acknowledge_message_id(message_id);
 498        cx.notify();
 499    }
 500
 501    pub fn update_latest_message_id(
 502        &mut self,
 503        channel_id: ChannelId,
 504        message_id: u64,
 505        cx: &mut ModelContext<Self>,
 506    ) {
 507        self.channel_states
 508            .entry(channel_id)
 509            .or_insert_with(|| Default::default())
 510            .update_latest_message_id(message_id);
 511        cx.notify();
 512    }
 513
 514    pub fn acknowledge_notes_version(
 515        &mut self,
 516        channel_id: ChannelId,
 517        epoch: u64,
 518        version: &clock::Global,
 519        cx: &mut ModelContext<Self>,
 520    ) {
 521        self.channel_states
 522            .entry(channel_id)
 523            .or_insert_with(|| Default::default())
 524            .acknowledge_notes_version(epoch, version);
 525        cx.notify()
 526    }
 527
 528    pub fn update_latest_notes_version(
 529        &mut self,
 530        channel_id: ChannelId,
 531        epoch: u64,
 532        version: &clock::Global,
 533        cx: &mut ModelContext<Self>,
 534    ) {
 535        self.channel_states
 536            .entry(channel_id)
 537            .or_insert_with(|| Default::default())
 538            .update_latest_notes_version(epoch, version);
 539        cx.notify()
 540    }
 541
 542    pub fn open_channel_chat(
 543        &mut self,
 544        channel_id: ChannelId,
 545        cx: &mut ModelContext<Self>,
 546    ) -> Task<Result<Model<ChannelChat>>> {
 547        let client = self.client.clone();
 548        let user_store = self.user_store.clone();
 549        let this = cx.handle();
 550        self.open_channel_resource(
 551            channel_id,
 552            |this| &mut this.opened_chats,
 553            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 554            cx,
 555        )
 556    }
 557
 558    /// Asynchronously open a given resource associated with a channel.
 559    ///
 560    /// Make sure that the resource is only opened once, even if this method
 561    /// is called multiple times with the same channel id while the first task
 562    /// is still running.
 563    fn open_channel_resource<T, F, Fut>(
 564        &mut self,
 565        channel_id: ChannelId,
 566        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 567        load: F,
 568        cx: &mut ModelContext<Self>,
 569    ) -> Task<Result<Model<T>>>
 570    where
 571        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 572        Fut: Future<Output = Result<Model<T>>>,
 573        T: 'static,
 574    {
 575        let task = loop {
 576            match get_map(self).entry(channel_id) {
 577                hash_map::Entry::Occupied(e) => match e.get() {
 578                    OpenedModelHandle::Open(model) => {
 579                        if let Some(model) = model.upgrade() {
 580                            break Task::ready(Ok(model)).shared();
 581                        } else {
 582                            get_map(self).remove(&channel_id);
 583                            continue;
 584                        }
 585                    }
 586                    OpenedModelHandle::Loading(task) => {
 587                        break task.clone();
 588                    }
 589                },
 590                hash_map::Entry::Vacant(e) => {
 591                    let task = cx
 592                        .spawn(move |this, mut cx| async move {
 593                            let channel = this.update(&mut cx, |this, _| {
 594                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 595                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 596                                })
 597                            })??;
 598
 599                            load(channel, cx).await.map_err(Arc::new)
 600                        })
 601                        .shared();
 602
 603                    e.insert(OpenedModelHandle::Loading(task.clone()));
 604                    cx.spawn({
 605                        let task = task.clone();
 606                        move |this, mut cx| async move {
 607                            let result = task.await;
 608                            this.update(&mut cx, |this, _| match result {
 609                                Ok(model) => {
 610                                    get_map(this).insert(
 611                                        channel_id,
 612                                        OpenedModelHandle::Open(model.downgrade()),
 613                                    );
 614                                }
 615                                Err(_) => {
 616                                    get_map(this).remove(&channel_id);
 617                                }
 618                            })
 619                            .ok();
 620                        }
 621                    })
 622                    .detach();
 623                    break task;
 624                }
 625            }
 626        };
 627        cx.background_executor()
 628            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 629    }
 630
 631    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 632        self.channel_role(channel_id) == proto::ChannelRole::Admin
 633    }
 634
 635    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 636        self.channel_index
 637            .by_id()
 638            .get(&channel_id)
 639            .map_or(false, |channel| channel.is_root_channel())
 640    }
 641
 642    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 643        self.channel_index
 644            .by_id()
 645            .get(&channel_id)
 646            .map_or(false, |channel| {
 647                channel.visibility == ChannelVisibility::Public
 648            })
 649    }
 650
 651    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 652        match self.channel_role(channel_id) {
 653            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 654            _ => Capability::ReadOnly,
 655        }
 656    }
 657
 658    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 659        maybe!({
 660            let mut channel = self.channel_for_id(channel_id)?;
 661            if !channel.is_root_channel() {
 662                channel = self.channel_for_id(channel.root_id())?;
 663            }
 664            let root_channel_state = self.channel_states.get(&channel.id);
 665            root_channel_state?.role
 666        })
 667        .unwrap_or(proto::ChannelRole::Guest)
 668    }
 669
 670    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 671        self.channel_participants
 672            .get(&channel_id)
 673            .map_or(&[], |v| v.as_slice())
 674    }
 675
 676    pub fn create_channel(
 677        &self,
 678        name: &str,
 679        parent_id: Option<ChannelId>,
 680        cx: &mut ModelContext<Self>,
 681    ) -> Task<Result<ChannelId>> {
 682        let client = self.client.clone();
 683        let name = name.trim_start_matches('#').to_owned();
 684        cx.spawn(move |this, mut cx| async move {
 685            let response = client
 686                .request(proto::CreateChannel {
 687                    name,
 688                    parent_id: parent_id.map(|cid| cid.0),
 689                })
 690                .await?;
 691
 692            let channel = response
 693                .channel
 694                .ok_or_else(|| anyhow!("missing channel in response"))?;
 695            let channel_id = ChannelId(channel.id);
 696
 697            this.update(&mut cx, |this, cx| {
 698                let task = this.update_channels(
 699                    proto::UpdateChannels {
 700                        channels: vec![channel],
 701                        ..Default::default()
 702                    },
 703                    cx,
 704                );
 705                assert!(task.is_none());
 706
 707                // This event is emitted because the collab panel wants to clear the pending edit state
 708                // before this frame is rendered. But we can't guarantee that the collab panel's future
 709                // will resolve before this flush_effects finishes. Synchronously emitting this event
 710                // ensures that the collab panel will observe this creation before the frame completes
 711                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 712            })?;
 713
 714            Ok(channel_id)
 715        })
 716    }
 717
 718    pub fn move_channel(
 719        &mut self,
 720        channel_id: ChannelId,
 721        to: ChannelId,
 722        cx: &mut ModelContext<Self>,
 723    ) -> Task<Result<()>> {
 724        let client = self.client.clone();
 725        cx.spawn(move |_, _| async move {
 726            let _ = client
 727                .request(proto::MoveChannel {
 728                    channel_id: channel_id.0,
 729                    to: to.0,
 730                })
 731                .await?;
 732
 733            Ok(())
 734        })
 735    }
 736
 737    pub fn set_channel_visibility(
 738        &mut self,
 739        channel_id: ChannelId,
 740        visibility: ChannelVisibility,
 741        cx: &mut ModelContext<Self>,
 742    ) -> Task<Result<()>> {
 743        let client = self.client.clone();
 744        cx.spawn(move |_, _| async move {
 745            let _ = client
 746                .request(proto::SetChannelVisibility {
 747                    channel_id: channel_id.0,
 748                    visibility: visibility.into(),
 749                })
 750                .await?;
 751
 752            Ok(())
 753        })
 754    }
 755
 756    pub fn invite_member(
 757        &mut self,
 758        channel_id: ChannelId,
 759        user_id: UserId,
 760        role: proto::ChannelRole,
 761        cx: &mut ModelContext<Self>,
 762    ) -> Task<Result<()>> {
 763        if !self.outgoing_invites.insert((channel_id, user_id)) {
 764            return Task::ready(Err(anyhow!("invite request already in progress")));
 765        }
 766
 767        cx.notify();
 768        let client = self.client.clone();
 769        cx.spawn(move |this, mut cx| async move {
 770            let result = client
 771                .request(proto::InviteChannelMember {
 772                    channel_id: channel_id.0,
 773                    user_id,
 774                    role: role.into(),
 775                })
 776                .await;
 777
 778            this.update(&mut cx, |this, cx| {
 779                this.outgoing_invites.remove(&(channel_id, user_id));
 780                cx.notify();
 781            })?;
 782
 783            result?;
 784
 785            Ok(())
 786        })
 787    }
 788
 789    pub fn remove_member(
 790        &mut self,
 791        channel_id: ChannelId,
 792        user_id: u64,
 793        cx: &mut ModelContext<Self>,
 794    ) -> Task<Result<()>> {
 795        if !self.outgoing_invites.insert((channel_id, user_id)) {
 796            return Task::ready(Err(anyhow!("invite request already in progress")));
 797        }
 798
 799        cx.notify();
 800        let client = self.client.clone();
 801        cx.spawn(move |this, mut cx| async move {
 802            let result = client
 803                .request(proto::RemoveChannelMember {
 804                    channel_id: channel_id.0,
 805                    user_id,
 806                })
 807                .await;
 808
 809            this.update(&mut cx, |this, cx| {
 810                this.outgoing_invites.remove(&(channel_id, user_id));
 811                cx.notify();
 812            })?;
 813            result?;
 814            Ok(())
 815        })
 816    }
 817
 818    pub fn set_member_role(
 819        &mut self,
 820        channel_id: ChannelId,
 821        user_id: UserId,
 822        role: proto::ChannelRole,
 823        cx: &mut ModelContext<Self>,
 824    ) -> Task<Result<()>> {
 825        if !self.outgoing_invites.insert((channel_id, user_id)) {
 826            return Task::ready(Err(anyhow!("member request already in progress")));
 827        }
 828
 829        cx.notify();
 830        let client = self.client.clone();
 831        cx.spawn(move |this, mut cx| async move {
 832            let result = client
 833                .request(proto::SetChannelMemberRole {
 834                    channel_id: channel_id.0,
 835                    user_id,
 836                    role: role.into(),
 837                })
 838                .await;
 839
 840            this.update(&mut cx, |this, cx| {
 841                this.outgoing_invites.remove(&(channel_id, user_id));
 842                cx.notify();
 843            })?;
 844
 845            result?;
 846            Ok(())
 847        })
 848    }
 849
 850    pub fn rename(
 851        &mut self,
 852        channel_id: ChannelId,
 853        new_name: &str,
 854        cx: &mut ModelContext<Self>,
 855    ) -> Task<Result<()>> {
 856        let client = self.client.clone();
 857        let name = new_name.to_string();
 858        cx.spawn(move |this, mut cx| async move {
 859            let channel = client
 860                .request(proto::RenameChannel {
 861                    channel_id: channel_id.0,
 862                    name,
 863                })
 864                .await?
 865                .channel
 866                .ok_or_else(|| anyhow!("missing channel in response"))?;
 867            this.update(&mut cx, |this, cx| {
 868                let task = this.update_channels(
 869                    proto::UpdateChannels {
 870                        channels: vec![channel],
 871                        ..Default::default()
 872                    },
 873                    cx,
 874                );
 875                assert!(task.is_none());
 876
 877                // This event is emitted because the collab panel wants to clear the pending edit state
 878                // before this frame is rendered. But we can't guarantee that the collab panel's future
 879                // will resolve before this flush_effects finishes. Synchronously emitting this event
 880                // ensures that the collab panel will observe this creation before the frame complete
 881                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 882            })?;
 883            Ok(())
 884        })
 885    }
 886
 887    pub fn respond_to_channel_invite(
 888        &mut self,
 889        channel_id: ChannelId,
 890        accept: bool,
 891        cx: &mut ModelContext<Self>,
 892    ) -> Task<Result<()>> {
 893        let client = self.client.clone();
 894        cx.background_executor().spawn(async move {
 895            client
 896                .request(proto::RespondToChannelInvite {
 897                    channel_id: channel_id.0,
 898                    accept,
 899                })
 900                .await?;
 901            Ok(())
 902        })
 903    }
 904
 905    pub fn create_remote_project(
 906        &mut self,
 907        channel_id: ChannelId,
 908        dev_server_id: DevServerId,
 909        name: String,
 910        path: String,
 911        cx: &mut ModelContext<Self>,
 912    ) -> Task<Result<proto::CreateRemoteProjectResponse>> {
 913        let client = self.client.clone();
 914        cx.background_executor().spawn(async move {
 915            client
 916                .request(proto::CreateRemoteProject {
 917                    channel_id: channel_id.0,
 918                    dev_server_id: dev_server_id.0,
 919                    name,
 920                    path,
 921                })
 922                .await
 923        })
 924    }
 925
 926    pub fn create_dev_server(
 927        &mut self,
 928        channel_id: ChannelId,
 929        name: String,
 930        cx: &mut ModelContext<Self>,
 931    ) -> Task<Result<proto::CreateDevServerResponse>> {
 932        let client = self.client.clone();
 933        cx.background_executor().spawn(async move {
 934            let result = client
 935                .request(proto::CreateDevServer {
 936                    channel_id: channel_id.0,
 937                    name,
 938                })
 939                .await?;
 940            Ok(result)
 941        })
 942    }
 943
 944    pub fn get_channel_member_details(
 945        &self,
 946        channel_id: ChannelId,
 947        cx: &mut ModelContext<Self>,
 948    ) -> Task<Result<Vec<ChannelMembership>>> {
 949        let client = self.client.clone();
 950        let user_store = self.user_store.downgrade();
 951        cx.spawn(move |_, mut cx| async move {
 952            let response = client
 953                .request(proto::GetChannelMembers {
 954                    channel_id: channel_id.0,
 955                })
 956                .await?;
 957
 958            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 959            let user_store = user_store
 960                .upgrade()
 961                .ok_or_else(|| anyhow!("user store dropped"))?;
 962            let users = user_store
 963                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 964                .await?;
 965
 966            Ok(users
 967                .into_iter()
 968                .zip(response.members)
 969                .map(|(user, member)| ChannelMembership {
 970                    user,
 971                    role: member.role(),
 972                    kind: member.kind(),
 973                })
 974                .collect())
 975        })
 976    }
 977
 978    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 979        let client = self.client.clone();
 980        async move {
 981            client
 982                .request(proto::DeleteChannel {
 983                    channel_id: channel_id.0,
 984                })
 985                .await?;
 986            Ok(())
 987        }
 988    }
 989
 990    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 991        false
 992    }
 993
 994    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 995        self.outgoing_invites.contains(&(channel_id, user_id))
 996    }
 997
 998    async fn handle_update_channels(
 999        this: Model<Self>,
1000        message: TypedEnvelope<proto::UpdateChannels>,
1001        _: Arc<Client>,
1002        mut cx: AsyncAppContext,
1003    ) -> Result<()> {
1004        this.update(&mut cx, |this, _| {
1005            this.update_channels_tx
1006                .unbounded_send(message.payload)
1007                .unwrap();
1008        })?;
1009        Ok(())
1010    }
1011
1012    async fn handle_update_user_channels(
1013        this: Model<Self>,
1014        message: TypedEnvelope<proto::UpdateUserChannels>,
1015        _: Arc<Client>,
1016        mut cx: AsyncAppContext,
1017    ) -> Result<()> {
1018        this.update(&mut cx, |this, cx| {
1019            for buffer_version in message.payload.observed_channel_buffer_version {
1020                let version = language::proto::deserialize_version(&buffer_version.version);
1021                this.acknowledge_notes_version(
1022                    ChannelId(buffer_version.channel_id),
1023                    buffer_version.epoch,
1024                    &version,
1025                    cx,
1026                );
1027            }
1028            for message_id in message.payload.observed_channel_message_id {
1029                this.acknowledge_message_id(
1030                    ChannelId(message_id.channel_id),
1031                    message_id.message_id,
1032                    cx,
1033                );
1034            }
1035            for membership in message.payload.channel_memberships {
1036                if let Some(role) = ChannelRole::from_i32(membership.role) {
1037                    this.channel_states
1038                        .entry(ChannelId(membership.channel_id))
1039                        .or_insert_with(|| ChannelState::default())
1040                        .set_role(role)
1041                }
1042            }
1043        })
1044    }
1045
1046    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
1047        self.channel_index.clear();
1048        self.channel_invitations.clear();
1049        self.channel_participants.clear();
1050        self.channel_index.clear();
1051        self.outgoing_invites.clear();
1052        self.disconnect_channel_buffers_task.take();
1053
1054        for chat in self.opened_chats.values() {
1055            if let OpenedModelHandle::Open(chat) = chat {
1056                if let Some(chat) = chat.upgrade() {
1057                    chat.update(cx, |chat, cx| {
1058                        chat.rejoin(cx);
1059                    });
1060                }
1061            }
1062        }
1063
1064        let mut buffer_versions = Vec::new();
1065        for buffer in self.opened_buffers.values() {
1066            if let OpenedModelHandle::Open(buffer) = buffer {
1067                if let Some(buffer) = buffer.upgrade() {
1068                    let channel_buffer = buffer.read(cx);
1069                    let buffer = channel_buffer.buffer().read(cx);
1070                    buffer_versions.push(proto::ChannelBufferVersion {
1071                        channel_id: channel_buffer.channel_id.0,
1072                        epoch: channel_buffer.epoch(),
1073                        version: language::proto::serialize_version(&buffer.version()),
1074                    });
1075                }
1076            }
1077        }
1078
1079        if buffer_versions.is_empty() {
1080            return Task::ready(Ok(()));
1081        }
1082
1083        let response = self.client.request(proto::RejoinChannelBuffers {
1084            buffers: buffer_versions,
1085        });
1086
1087        cx.spawn(|this, mut cx| async move {
1088            let mut response = response.await?;
1089
1090            this.update(&mut cx, |this, cx| {
1091                this.opened_buffers.retain(|_, buffer| match buffer {
1092                    OpenedModelHandle::Open(channel_buffer) => {
1093                        let Some(channel_buffer) = channel_buffer.upgrade() else {
1094                            return false;
1095                        };
1096
1097                        channel_buffer.update(cx, |channel_buffer, cx| {
1098                            let channel_id = channel_buffer.channel_id;
1099                            if let Some(remote_buffer) = response
1100                                .buffers
1101                                .iter_mut()
1102                                .find(|buffer| buffer.channel_id == channel_id.0)
1103                            {
1104                                let channel_id = channel_buffer.channel_id;
1105                                let remote_version =
1106                                    language::proto::deserialize_version(&remote_buffer.version);
1107
1108                                channel_buffer.replace_collaborators(
1109                                    mem::take(&mut remote_buffer.collaborators),
1110                                    cx,
1111                                );
1112
1113                                let operations = channel_buffer
1114                                    .buffer()
1115                                    .update(cx, |buffer, cx| {
1116                                        let outgoing_operations =
1117                                            buffer.serialize_ops(Some(remote_version), cx);
1118                                        let incoming_operations =
1119                                            mem::take(&mut remote_buffer.operations)
1120                                                .into_iter()
1121                                                .map(language::proto::deserialize_operation)
1122                                                .collect::<Result<Vec<_>>>()?;
1123                                        buffer.apply_ops(incoming_operations, cx)?;
1124                                        anyhow::Ok(outgoing_operations)
1125                                    })
1126                                    .log_err();
1127
1128                                if let Some(operations) = operations {
1129                                    let client = this.client.clone();
1130                                    cx.background_executor()
1131                                        .spawn(async move {
1132                                            let operations = operations.await;
1133                                            for chunk in
1134                                                language::proto::split_operations(operations)
1135                                            {
1136                                                client
1137                                                    .send(proto::UpdateChannelBuffer {
1138                                                        channel_id: channel_id.0,
1139                                                        operations: chunk,
1140                                                    })
1141                                                    .ok();
1142                                            }
1143                                        })
1144                                        .detach();
1145                                    return true;
1146                                }
1147                            }
1148
1149                            channel_buffer.disconnect(cx);
1150                            false
1151                        })
1152                    }
1153                    OpenedModelHandle::Loading(_) => true,
1154                });
1155            })
1156            .ok();
1157            anyhow::Ok(())
1158        })
1159    }
1160
1161    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1162        cx.notify();
1163
1164        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1165            cx.spawn(move |this, mut cx| async move {
1166                if wait_for_reconnect {
1167                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1168                }
1169
1170                if let Some(this) = this.upgrade() {
1171                    this.update(&mut cx, |this, cx| {
1172                        for (_, buffer) in this.opened_buffers.drain() {
1173                            if let OpenedModelHandle::Open(buffer) = buffer {
1174                                if let Some(buffer) = buffer.upgrade() {
1175                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1176                                }
1177                            }
1178                        }
1179                    })
1180                    .ok();
1181                }
1182            })
1183        });
1184    }
1185
1186    pub(crate) fn update_channels(
1187        &mut self,
1188        payload: proto::UpdateChannels,
1189        cx: &mut ModelContext<ChannelStore>,
1190    ) -> Option<Task<Result<()>>> {
1191        if !payload.remove_channel_invitations.is_empty() {
1192            self.channel_invitations
1193                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1194        }
1195        for channel in payload.channel_invitations {
1196            match self
1197                .channel_invitations
1198                .binary_search_by_key(&channel.id, |c| c.id.0)
1199            {
1200                Ok(ix) => {
1201                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1202                }
1203                Err(ix) => self.channel_invitations.insert(
1204                    ix,
1205                    Arc::new(Channel {
1206                        id: ChannelId(channel.id),
1207                        visibility: channel.visibility(),
1208                        name: channel.name.into(),
1209                        parent_path: channel
1210                            .parent_path
1211                            .into_iter()
1212                            .map(|cid| ChannelId(cid))
1213                            .collect(),
1214                    }),
1215                ),
1216            }
1217        }
1218
1219        let channels_changed = !payload.channels.is_empty()
1220            || !payload.delete_channels.is_empty()
1221            || !payload.latest_channel_message_ids.is_empty()
1222            || !payload.latest_channel_buffer_versions.is_empty()
1223            || !payload.hosted_projects.is_empty()
1224            || !payload.deleted_hosted_projects.is_empty()
1225            || !payload.dev_servers.is_empty()
1226            || !payload.deleted_dev_servers.is_empty()
1227            || !payload.remote_projects.is_empty()
1228            || !payload.deleted_remote_projects.is_empty();
1229
1230        if channels_changed {
1231            if !payload.delete_channels.is_empty() {
1232                let delete_channels: Vec<ChannelId> = payload
1233                    .delete_channels
1234                    .into_iter()
1235                    .map(|cid| ChannelId(cid))
1236                    .collect();
1237                self.channel_index.delete_channels(&delete_channels);
1238                self.channel_participants
1239                    .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1240
1241                for channel_id in &delete_channels {
1242                    let channel_id = *channel_id;
1243                    if payload
1244                        .channels
1245                        .iter()
1246                        .any(|channel| channel.id == channel_id.0)
1247                    {
1248                        continue;
1249                    }
1250                    if let Some(OpenedModelHandle::Open(buffer)) =
1251                        self.opened_buffers.remove(&channel_id)
1252                    {
1253                        if let Some(buffer) = buffer.upgrade() {
1254                            buffer.update(cx, ChannelBuffer::disconnect);
1255                        }
1256                    }
1257                }
1258            }
1259
1260            let mut index = self.channel_index.bulk_insert();
1261            for channel in payload.channels {
1262                let id = ChannelId(channel.id);
1263                let channel_changed = index.insert(channel);
1264
1265                if channel_changed {
1266                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1267                        if let Some(buffer) = buffer.upgrade() {
1268                            buffer.update(cx, ChannelBuffer::channel_changed);
1269                        }
1270                    }
1271                }
1272            }
1273
1274            for latest_buffer_version in payload.latest_channel_buffer_versions {
1275                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1276                self.channel_states
1277                    .entry(ChannelId(latest_buffer_version.channel_id))
1278                    .or_default()
1279                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1280            }
1281
1282            for latest_channel_message in payload.latest_channel_message_ids {
1283                self.channel_states
1284                    .entry(ChannelId(latest_channel_message.channel_id))
1285                    .or_default()
1286                    .update_latest_message_id(latest_channel_message.message_id);
1287            }
1288
1289            for hosted_project in payload.hosted_projects {
1290                let hosted_project: HostedProject = hosted_project.into();
1291                if let Some(old_project) = self
1292                    .hosted_projects
1293                    .insert(hosted_project.project_id, hosted_project.clone())
1294                {
1295                    self.channel_states
1296                        .entry(old_project.channel_id)
1297                        .or_default()
1298                        .remove_hosted_project(old_project.project_id);
1299                }
1300                self.channel_states
1301                    .entry(hosted_project.channel_id)
1302                    .or_default()
1303                    .add_hosted_project(hosted_project.project_id);
1304            }
1305
1306            for hosted_project_id in payload.deleted_hosted_projects {
1307                let hosted_project_id = ProjectId(hosted_project_id);
1308
1309                if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1310                    self.channel_states
1311                        .entry(old_project.channel_id)
1312                        .or_default()
1313                        .remove_hosted_project(old_project.project_id);
1314                }
1315            }
1316
1317            for remote_project in payload.remote_projects {
1318                let remote_project: RemoteProject = remote_project.into();
1319                if let Some(old_remote_project) = self
1320                    .remote_projects
1321                    .insert(remote_project.id, remote_project.clone())
1322                {
1323                    self.channel_states
1324                        .entry(old_remote_project.channel_id)
1325                        .or_default()
1326                        .remove_remote_project(old_remote_project.id);
1327                }
1328                self.channel_states
1329                    .entry(remote_project.channel_id)
1330                    .or_default()
1331                    .add_remote_project(remote_project.id);
1332            }
1333
1334            for remote_project_id in payload.deleted_remote_projects {
1335                let remote_project_id = RemoteProjectId(remote_project_id);
1336
1337                if let Some(old_project) = self.remote_projects.remove(&remote_project_id) {
1338                    self.channel_states
1339                        .entry(old_project.channel_id)
1340                        .or_default()
1341                        .remove_remote_project(old_project.id);
1342                }
1343            }
1344
1345            for dev_server in payload.dev_servers {
1346                let dev_server: DevServer = dev_server.into();
1347                if let Some(old_server) = self.dev_servers.insert(dev_server.id, dev_server.clone())
1348                {
1349                    self.channel_states
1350                        .entry(old_server.channel_id)
1351                        .or_default()
1352                        .remove_dev_server(old_server.id);
1353                }
1354                self.channel_states
1355                    .entry(dev_server.channel_id)
1356                    .or_default()
1357                    .add_dev_server(dev_server.id);
1358            }
1359
1360            for dev_server_id in payload.deleted_dev_servers {
1361                let dev_server_id = DevServerId(dev_server_id);
1362
1363                if let Some(old_server) = self.dev_servers.remove(&dev_server_id) {
1364                    self.channel_states
1365                        .entry(old_server.channel_id)
1366                        .or_default()
1367                        .remove_dev_server(old_server.id);
1368                }
1369            }
1370        }
1371
1372        cx.notify();
1373        if payload.channel_participants.is_empty() {
1374            return None;
1375        }
1376
1377        let mut all_user_ids = Vec::new();
1378        let channel_participants = payload.channel_participants;
1379        for entry in &channel_participants {
1380            for user_id in entry.participant_user_ids.iter() {
1381                if let Err(ix) = all_user_ids.binary_search(user_id) {
1382                    all_user_ids.insert(ix, *user_id);
1383                }
1384            }
1385        }
1386
1387        let users = self
1388            .user_store
1389            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1390        Some(cx.spawn(|this, mut cx| async move {
1391            let users = users.await?;
1392
1393            this.update(&mut cx, |this, cx| {
1394                for entry in &channel_participants {
1395                    let mut participants: Vec<_> = entry
1396                        .participant_user_ids
1397                        .iter()
1398                        .filter_map(|user_id| {
1399                            users
1400                                .binary_search_by_key(&user_id, |user| &user.id)
1401                                .ok()
1402                                .map(|ix| users[ix].clone())
1403                        })
1404                        .collect();
1405
1406                    participants.sort_by_key(|u| u.id);
1407
1408                    this.channel_participants
1409                        .insert(ChannelId(entry.channel_id), participants);
1410                }
1411
1412                cx.notify();
1413            })
1414        }))
1415    }
1416}
1417
1418impl ChannelState {
1419    fn set_role(&mut self, role: ChannelRole) {
1420        self.role = Some(role);
1421    }
1422
1423    fn has_channel_buffer_changed(&self) -> bool {
1424        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1425            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1426                && self
1427                    .latest_notes_version
1428                    .version
1429                    .changed_since(&self.observed_notes_version.version))
1430    }
1431
1432    fn has_new_messages(&self) -> bool {
1433        let latest_message_id = self.latest_chat_message;
1434        let observed_message_id = self.observed_chat_message;
1435
1436        latest_message_id.is_some_and(|latest_message_id| {
1437            latest_message_id > observed_message_id.unwrap_or_default()
1438        })
1439    }
1440
1441    fn last_acknowledged_message_id(&self) -> Option<u64> {
1442        self.observed_chat_message
1443    }
1444
1445    fn acknowledge_message_id(&mut self, message_id: u64) {
1446        let observed = self.observed_chat_message.get_or_insert(message_id);
1447        *observed = (*observed).max(message_id);
1448    }
1449
1450    fn update_latest_message_id(&mut self, message_id: u64) {
1451        self.latest_chat_message =
1452            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1453    }
1454
1455    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1456        if self.observed_notes_version.epoch == epoch {
1457            self.observed_notes_version.version.join(version);
1458        } else {
1459            self.observed_notes_version = NotesVersion {
1460                epoch,
1461                version: version.clone(),
1462            };
1463        }
1464    }
1465
1466    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1467        if self.latest_notes_version.epoch == epoch {
1468            self.latest_notes_version.version.join(version);
1469        } else {
1470            self.latest_notes_version = NotesVersion {
1471                epoch,
1472                version: version.clone(),
1473            };
1474        }
1475    }
1476
1477    fn add_hosted_project(&mut self, project_id: ProjectId) {
1478        self.projects.insert(project_id);
1479    }
1480
1481    fn remove_hosted_project(&mut self, project_id: ProjectId) {
1482        self.projects.remove(&project_id);
1483    }
1484
1485    fn add_remote_project(&mut self, remote_project_id: RemoteProjectId) {
1486        self.remote_projects.insert(remote_project_id);
1487    }
1488
1489    fn remove_remote_project(&mut self, remote_project_id: RemoteProjectId) {
1490        self.remote_projects.remove(&remote_project_id);
1491    }
1492
1493    fn add_dev_server(&mut self, dev_server_id: DevServerId) {
1494        self.dev_servers.insert(dev_server_id);
1495    }
1496
1497    fn remove_dev_server(&mut self, dev_server_id: DevServerId) {
1498        self.dev_servers.remove(&dev_server_id);
1499    }
1500}