channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use rpc::{
  15    proto::{self, ChannelRole, ChannelVisibility},
  16    TypedEnvelope,
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{maybe, ResultExt};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  25    let channel_store =
  26        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36#[derive(Debug, Clone)]
  37pub struct HostedProject {
  38    project_id: ProjectId,
  39    channel_id: ChannelId,
  40    name: SharedString,
  41    _visibility: proto::ChannelVisibility,
  42}
  43impl From<proto::HostedProject> for HostedProject {
  44    fn from(project: proto::HostedProject) -> Self {
  45        Self {
  46            project_id: ProjectId(project.project_id),
  47            channel_id: ChannelId(project.channel_id),
  48            _visibility: project.visibility(),
  49            name: project.name.into(),
  50        }
  51    }
  52}
  53pub struct ChannelStore {
  54    pub channel_index: ChannelIndex,
  55    channel_invitations: Vec<Arc<Channel>>,
  56    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  57    channel_states: HashMap<ChannelId, ChannelState>,
  58    hosted_projects: HashMap<ProjectId, HostedProject>,
  59
  60    outgoing_invites: HashSet<(ChannelId, UserId)>,
  61    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  62    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  63    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  64    client: Arc<Client>,
  65    did_subscribe: bool,
  66    user_store: Model<UserStore>,
  67    _rpc_subscriptions: [Subscription; 2],
  68    _watch_connection_status: Task<Option<()>>,
  69    disconnect_channel_buffers_task: Option<Task<()>>,
  70    _update_channels: Task<()>,
  71}
  72
  73#[derive(Clone, Debug)]
  74pub struct Channel {
  75    pub id: ChannelId,
  76    pub name: SharedString,
  77    pub visibility: proto::ChannelVisibility,
  78    pub parent_path: Vec<ChannelId>,
  79}
  80
  81#[derive(Default, Debug)]
  82pub struct ChannelState {
  83    latest_chat_message: Option<u64>,
  84    latest_notes_version: NotesVersion,
  85    observed_notes_version: NotesVersion,
  86    observed_chat_message: Option<u64>,
  87    role: Option<ChannelRole>,
  88    projects: HashSet<ProjectId>,
  89}
  90
  91impl Channel {
  92    pub fn link(&self, cx: &AppContext) -> String {
  93        format!(
  94            "{}/channel/{}-{}",
  95            ClientSettings::get_global(cx).server_url,
  96            Self::slug(&self.name),
  97            self.id
  98        )
  99    }
 100
 101    pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
 102        self.link(cx)
 103            + "/notes"
 104            + &heading
 105                .map(|h| format!("#{}", Self::slug(&h)))
 106                .unwrap_or_default()
 107    }
 108
 109    pub fn is_root_channel(&self) -> bool {
 110        self.parent_path.is_empty()
 111    }
 112
 113    pub fn root_id(&self) -> ChannelId {
 114        self.parent_path.first().copied().unwrap_or(self.id)
 115    }
 116
 117    pub fn slug(str: &str) -> String {
 118        let slug: String = str
 119            .chars()
 120            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 121            .collect();
 122
 123        slug.trim_matches(|c| c == '-').to_string()
 124    }
 125}
 126
 127#[derive(Debug)]
 128pub struct ChannelMembership {
 129    pub user: Arc<User>,
 130    pub kind: proto::channel_member::Kind,
 131    pub role: proto::ChannelRole,
 132}
 133impl ChannelMembership {
 134    pub fn sort_key(&self) -> MembershipSortKey {
 135        MembershipSortKey {
 136            role_order: match self.role {
 137                proto::ChannelRole::Admin => 0,
 138                proto::ChannelRole::Member => 1,
 139                proto::ChannelRole::Banned => 2,
 140                proto::ChannelRole::Talker => 3,
 141                proto::ChannelRole::Guest => 4,
 142            },
 143            kind_order: match self.kind {
 144                proto::channel_member::Kind::Member => 0,
 145                proto::channel_member::Kind::Invitee => 1,
 146            },
 147            username_order: self.user.github_login.as_str(),
 148        }
 149    }
 150}
 151
 152#[derive(PartialOrd, Ord, PartialEq, Eq)]
 153pub struct MembershipSortKey<'a> {
 154    role_order: u8,
 155    kind_order: u8,
 156    username_order: &'a str,
 157}
 158
 159pub enum ChannelEvent {
 160    ChannelCreated(ChannelId),
 161    ChannelRenamed(ChannelId),
 162}
 163
 164impl EventEmitter<ChannelEvent> for ChannelStore {}
 165
 166enum OpenedModelHandle<E> {
 167    Open(WeakModel<E>),
 168    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 169}
 170
 171struct GlobalChannelStore(Model<ChannelStore>);
 172
 173impl Global for GlobalChannelStore {}
 174
 175impl ChannelStore {
 176    pub fn global(cx: &AppContext) -> Model<Self> {
 177        cx.global::<GlobalChannelStore>().0.clone()
 178    }
 179
 180    pub fn new(
 181        client: Arc<Client>,
 182        user_store: Model<UserStore>,
 183        cx: &mut ModelContext<Self>,
 184    ) -> Self {
 185        let rpc_subscriptions = [
 186            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 187            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 188        ];
 189
 190        let mut connection_status = client.status();
 191        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 192        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 193            while let Some(status) = connection_status.next().await {
 194                let this = this.upgrade()?;
 195                match status {
 196                    client::Status::Connected { .. } => {
 197                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 198                            .ok()?
 199                            .await
 200                            .log_err()?;
 201                    }
 202                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 203                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 204                            .ok();
 205                    }
 206                    _ => {
 207                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 208                            .ok();
 209                    }
 210                }
 211            }
 212            Some(())
 213        });
 214
 215        Self {
 216            channel_invitations: Vec::default(),
 217            channel_index: ChannelIndex::default(),
 218            channel_participants: Default::default(),
 219            hosted_projects: Default::default(),
 220            outgoing_invites: Default::default(),
 221            opened_buffers: Default::default(),
 222            opened_chats: Default::default(),
 223            update_channels_tx,
 224            client,
 225            user_store,
 226            _rpc_subscriptions: rpc_subscriptions,
 227            _watch_connection_status: watch_connection_status,
 228            disconnect_channel_buffers_task: None,
 229            _update_channels: cx.spawn(|this, mut cx| async move {
 230                maybe!(async move {
 231                    while let Some(update_channels) = update_channels_rx.next().await {
 232                        if let Some(this) = this.upgrade() {
 233                            let update_task = this.update(&mut cx, |this, cx| {
 234                                this.update_channels(update_channels, cx)
 235                            })?;
 236                            if let Some(update_task) = update_task {
 237                                update_task.await.log_err();
 238                            }
 239                        }
 240                    }
 241                    anyhow::Ok(())
 242                })
 243                .await
 244                .log_err();
 245            }),
 246            channel_states: Default::default(),
 247            did_subscribe: false,
 248        }
 249    }
 250
 251    pub fn initialize(&mut self) {
 252        if !self.did_subscribe
 253            && self
 254                .client
 255                .send(proto::SubscribeToChannels {})
 256                .log_err()
 257                .is_some()
 258        {
 259            self.did_subscribe = true;
 260        }
 261    }
 262
 263    pub fn client(&self) -> Arc<Client> {
 264        self.client.clone()
 265    }
 266
 267    /// Returns the number of unique channels in the store
 268    pub fn channel_count(&self) -> usize {
 269        self.channel_index.by_id().len()
 270    }
 271
 272    /// Returns the index of a channel ID in the list of unique channels
 273    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 274        self.channel_index
 275            .by_id()
 276            .keys()
 277            .position(|id| *id == channel_id)
 278    }
 279
 280    /// Returns an iterator over all unique channels
 281    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 282        self.channel_index.by_id().values()
 283    }
 284
 285    /// Iterate over all entries in the channel DAG
 286    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 287        self.channel_index
 288            .ordered_channels()
 289            .iter()
 290            .filter_map(move |id| {
 291                let channel = self.channel_index.by_id().get(id)?;
 292                Some((channel.parent_path.len(), channel))
 293            })
 294    }
 295
 296    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 297        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 298        self.channel_index.by_id().get(channel_id)
 299    }
 300
 301    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 302        self.channel_index.by_id().values().nth(ix)
 303    }
 304
 305    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 306        self.channel_invitations
 307            .iter()
 308            .any(|channel| channel.id == channel_id)
 309    }
 310
 311    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 312        &self.channel_invitations
 313    }
 314
 315    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 316        self.channel_index.by_id().get(&channel_id)
 317    }
 318
 319    pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
 320        let mut projects: Vec<(SharedString, ProjectId)> = self
 321            .channel_states
 322            .get(&channel_id)
 323            .map(|state| state.projects.clone())
 324            .unwrap_or_default()
 325            .into_iter()
 326            .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
 327            .collect();
 328        projects.sort();
 329        projects
 330    }
 331
 332    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 333        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 334            if let OpenedModelHandle::Open(buffer) = buffer {
 335                return buffer.upgrade().is_some();
 336            }
 337        }
 338        false
 339    }
 340
 341    pub fn open_channel_buffer(
 342        &mut self,
 343        channel_id: ChannelId,
 344        cx: &mut ModelContext<Self>,
 345    ) -> Task<Result<Model<ChannelBuffer>>> {
 346        let client = self.client.clone();
 347        let user_store = self.user_store.clone();
 348        let channel_store = cx.handle();
 349        self.open_channel_resource(
 350            channel_id,
 351            |this| &mut this.opened_buffers,
 352            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 353            cx,
 354        )
 355    }
 356
 357    pub fn fetch_channel_messages(
 358        &self,
 359        message_ids: Vec<u64>,
 360        cx: &mut ModelContext<Self>,
 361    ) -> Task<Result<Vec<ChannelMessage>>> {
 362        let request = if message_ids.is_empty() {
 363            None
 364        } else {
 365            Some(
 366                self.client
 367                    .request(proto::GetChannelMessagesById { message_ids }),
 368            )
 369        };
 370        cx.spawn(|this, mut cx| async move {
 371            if let Some(request) = request {
 372                let response = request.await?;
 373                let this = this
 374                    .upgrade()
 375                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 376                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 377                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 378            } else {
 379                Ok(Vec::new())
 380            }
 381        })
 382    }
 383
 384    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 385        self.channel_states
 386            .get(&channel_id)
 387            .is_some_and(|state| state.has_channel_buffer_changed())
 388    }
 389
 390    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 391        self.channel_states
 392            .get(&channel_id)
 393            .is_some_and(|state| state.has_new_messages())
 394    }
 395
 396    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 397        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 398            state.latest_chat_message = message_id;
 399        }
 400    }
 401
 402    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 403        self.channel_states.get(&channel_id).and_then(|state| {
 404            if let Some(last_message_id) = state.latest_chat_message {
 405                if state
 406                    .last_acknowledged_message_id()
 407                    .is_some_and(|id| id < last_message_id)
 408                {
 409                    return state.last_acknowledged_message_id();
 410                }
 411            }
 412
 413            None
 414        })
 415    }
 416
 417    pub fn acknowledge_message_id(
 418        &mut self,
 419        channel_id: ChannelId,
 420        message_id: u64,
 421        cx: &mut ModelContext<Self>,
 422    ) {
 423        self.channel_states
 424            .entry(channel_id)
 425            .or_default()
 426            .acknowledge_message_id(message_id);
 427        cx.notify();
 428    }
 429
 430    pub fn update_latest_message_id(
 431        &mut self,
 432        channel_id: ChannelId,
 433        message_id: u64,
 434        cx: &mut ModelContext<Self>,
 435    ) {
 436        self.channel_states
 437            .entry(channel_id)
 438            .or_default()
 439            .update_latest_message_id(message_id);
 440        cx.notify();
 441    }
 442
 443    pub fn acknowledge_notes_version(
 444        &mut self,
 445        channel_id: ChannelId,
 446        epoch: u64,
 447        version: &clock::Global,
 448        cx: &mut ModelContext<Self>,
 449    ) {
 450        self.channel_states
 451            .entry(channel_id)
 452            .or_default()
 453            .acknowledge_notes_version(epoch, version);
 454        cx.notify()
 455    }
 456
 457    pub fn update_latest_notes_version(
 458        &mut self,
 459        channel_id: ChannelId,
 460        epoch: u64,
 461        version: &clock::Global,
 462        cx: &mut ModelContext<Self>,
 463    ) {
 464        self.channel_states
 465            .entry(channel_id)
 466            .or_default()
 467            .update_latest_notes_version(epoch, version);
 468        cx.notify()
 469    }
 470
 471    pub fn open_channel_chat(
 472        &mut self,
 473        channel_id: ChannelId,
 474        cx: &mut ModelContext<Self>,
 475    ) -> Task<Result<Model<ChannelChat>>> {
 476        let client = self.client.clone();
 477        let user_store = self.user_store.clone();
 478        let this = cx.handle();
 479        self.open_channel_resource(
 480            channel_id,
 481            |this| &mut this.opened_chats,
 482            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 483            cx,
 484        )
 485    }
 486
 487    /// Asynchronously open a given resource associated with a channel.
 488    ///
 489    /// Make sure that the resource is only opened once, even if this method
 490    /// is called multiple times with the same channel id while the first task
 491    /// is still running.
 492    fn open_channel_resource<T, F, Fut>(
 493        &mut self,
 494        channel_id: ChannelId,
 495        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 496        load: F,
 497        cx: &mut ModelContext<Self>,
 498    ) -> Task<Result<Model<T>>>
 499    where
 500        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 501        Fut: Future<Output = Result<Model<T>>>,
 502        T: 'static,
 503    {
 504        let task = loop {
 505            match get_map(self).entry(channel_id) {
 506                hash_map::Entry::Occupied(e) => match e.get() {
 507                    OpenedModelHandle::Open(model) => {
 508                        if let Some(model) = model.upgrade() {
 509                            break Task::ready(Ok(model)).shared();
 510                        } else {
 511                            get_map(self).remove(&channel_id);
 512                            continue;
 513                        }
 514                    }
 515                    OpenedModelHandle::Loading(task) => {
 516                        break task.clone();
 517                    }
 518                },
 519                hash_map::Entry::Vacant(e) => {
 520                    let task = cx
 521                        .spawn(move |this, mut cx| async move {
 522                            let channel = this.update(&mut cx, |this, _| {
 523                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 524                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 525                                })
 526                            })??;
 527
 528                            load(channel, cx).await.map_err(Arc::new)
 529                        })
 530                        .shared();
 531
 532                    e.insert(OpenedModelHandle::Loading(task.clone()));
 533                    cx.spawn({
 534                        let task = task.clone();
 535                        move |this, mut cx| async move {
 536                            let result = task.await;
 537                            this.update(&mut cx, |this, _| match result {
 538                                Ok(model) => {
 539                                    get_map(this).insert(
 540                                        channel_id,
 541                                        OpenedModelHandle::Open(model.downgrade()),
 542                                    );
 543                                }
 544                                Err(_) => {
 545                                    get_map(this).remove(&channel_id);
 546                                }
 547                            })
 548                            .ok();
 549                        }
 550                    })
 551                    .detach();
 552                    break task;
 553                }
 554            }
 555        };
 556        cx.background_executor()
 557            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 558    }
 559
 560    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 561        self.channel_role(channel_id) == proto::ChannelRole::Admin
 562    }
 563
 564    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 565        self.channel_index
 566            .by_id()
 567            .get(&channel_id)
 568            .map_or(false, |channel| channel.is_root_channel())
 569    }
 570
 571    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 572        self.channel_index
 573            .by_id()
 574            .get(&channel_id)
 575            .map_or(false, |channel| {
 576                channel.visibility == ChannelVisibility::Public
 577            })
 578    }
 579
 580    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 581        match self.channel_role(channel_id) {
 582            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 583            _ => Capability::ReadOnly,
 584        }
 585    }
 586
 587    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 588        maybe!({
 589            let mut channel = self.channel_for_id(channel_id)?;
 590            if !channel.is_root_channel() {
 591                channel = self.channel_for_id(channel.root_id())?;
 592            }
 593            let root_channel_state = self.channel_states.get(&channel.id);
 594            root_channel_state?.role
 595        })
 596        .unwrap_or(proto::ChannelRole::Guest)
 597    }
 598
 599    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 600        self.channel_participants
 601            .get(&channel_id)
 602            .map_or(&[], |v| v.as_slice())
 603    }
 604
 605    pub fn create_channel(
 606        &self,
 607        name: &str,
 608        parent_id: Option<ChannelId>,
 609        cx: &mut ModelContext<Self>,
 610    ) -> Task<Result<ChannelId>> {
 611        let client = self.client.clone();
 612        let name = name.trim_start_matches('#').to_owned();
 613        cx.spawn(move |this, mut cx| async move {
 614            let response = client
 615                .request(proto::CreateChannel {
 616                    name,
 617                    parent_id: parent_id.map(|cid| cid.0),
 618                })
 619                .await?;
 620
 621            let channel = response
 622                .channel
 623                .ok_or_else(|| anyhow!("missing channel in response"))?;
 624            let channel_id = ChannelId(channel.id);
 625
 626            this.update(&mut cx, |this, cx| {
 627                let task = this.update_channels(
 628                    proto::UpdateChannels {
 629                        channels: vec![channel],
 630                        ..Default::default()
 631                    },
 632                    cx,
 633                );
 634                assert!(task.is_none());
 635
 636                // This event is emitted because the collab panel wants to clear the pending edit state
 637                // before this frame is rendered. But we can't guarantee that the collab panel's future
 638                // will resolve before this flush_effects finishes. Synchronously emitting this event
 639                // ensures that the collab panel will observe this creation before the frame completes
 640                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 641            })?;
 642
 643            Ok(channel_id)
 644        })
 645    }
 646
 647    pub fn move_channel(
 648        &mut self,
 649        channel_id: ChannelId,
 650        to: ChannelId,
 651        cx: &mut ModelContext<Self>,
 652    ) -> Task<Result<()>> {
 653        let client = self.client.clone();
 654        cx.spawn(move |_, _| async move {
 655            let _ = client
 656                .request(proto::MoveChannel {
 657                    channel_id: channel_id.0,
 658                    to: to.0,
 659                })
 660                .await?;
 661
 662            Ok(())
 663        })
 664    }
 665
 666    pub fn set_channel_visibility(
 667        &mut self,
 668        channel_id: ChannelId,
 669        visibility: ChannelVisibility,
 670        cx: &mut ModelContext<Self>,
 671    ) -> Task<Result<()>> {
 672        let client = self.client.clone();
 673        cx.spawn(move |_, _| async move {
 674            let _ = client
 675                .request(proto::SetChannelVisibility {
 676                    channel_id: channel_id.0,
 677                    visibility: visibility.into(),
 678                })
 679                .await?;
 680
 681            Ok(())
 682        })
 683    }
 684
 685    pub fn invite_member(
 686        &mut self,
 687        channel_id: ChannelId,
 688        user_id: UserId,
 689        role: proto::ChannelRole,
 690        cx: &mut ModelContext<Self>,
 691    ) -> Task<Result<()>> {
 692        if !self.outgoing_invites.insert((channel_id, user_id)) {
 693            return Task::ready(Err(anyhow!("invite request already in progress")));
 694        }
 695
 696        cx.notify();
 697        let client = self.client.clone();
 698        cx.spawn(move |this, mut cx| async move {
 699            let result = client
 700                .request(proto::InviteChannelMember {
 701                    channel_id: channel_id.0,
 702                    user_id,
 703                    role: role.into(),
 704                })
 705                .await;
 706
 707            this.update(&mut cx, |this, cx| {
 708                this.outgoing_invites.remove(&(channel_id, user_id));
 709                cx.notify();
 710            })?;
 711
 712            result?;
 713
 714            Ok(())
 715        })
 716    }
 717
 718    pub fn remove_member(
 719        &mut self,
 720        channel_id: ChannelId,
 721        user_id: u64,
 722        cx: &mut ModelContext<Self>,
 723    ) -> Task<Result<()>> {
 724        if !self.outgoing_invites.insert((channel_id, user_id)) {
 725            return Task::ready(Err(anyhow!("invite request already in progress")));
 726        }
 727
 728        cx.notify();
 729        let client = self.client.clone();
 730        cx.spawn(move |this, mut cx| async move {
 731            let result = client
 732                .request(proto::RemoveChannelMember {
 733                    channel_id: channel_id.0,
 734                    user_id,
 735                })
 736                .await;
 737
 738            this.update(&mut cx, |this, cx| {
 739                this.outgoing_invites.remove(&(channel_id, user_id));
 740                cx.notify();
 741            })?;
 742            result?;
 743            Ok(())
 744        })
 745    }
 746
 747    pub fn set_member_role(
 748        &mut self,
 749        channel_id: ChannelId,
 750        user_id: UserId,
 751        role: proto::ChannelRole,
 752        cx: &mut ModelContext<Self>,
 753    ) -> Task<Result<()>> {
 754        if !self.outgoing_invites.insert((channel_id, user_id)) {
 755            return Task::ready(Err(anyhow!("member request already in progress")));
 756        }
 757
 758        cx.notify();
 759        let client = self.client.clone();
 760        cx.spawn(move |this, mut cx| async move {
 761            let result = client
 762                .request(proto::SetChannelMemberRole {
 763                    channel_id: channel_id.0,
 764                    user_id,
 765                    role: role.into(),
 766                })
 767                .await;
 768
 769            this.update(&mut cx, |this, cx| {
 770                this.outgoing_invites.remove(&(channel_id, user_id));
 771                cx.notify();
 772            })?;
 773
 774            result?;
 775            Ok(())
 776        })
 777    }
 778
 779    pub fn rename(
 780        &mut self,
 781        channel_id: ChannelId,
 782        new_name: &str,
 783        cx: &mut ModelContext<Self>,
 784    ) -> Task<Result<()>> {
 785        let client = self.client.clone();
 786        let name = new_name.to_string();
 787        cx.spawn(move |this, mut cx| async move {
 788            let channel = client
 789                .request(proto::RenameChannel {
 790                    channel_id: channel_id.0,
 791                    name,
 792                })
 793                .await?
 794                .channel
 795                .ok_or_else(|| anyhow!("missing channel in response"))?;
 796            this.update(&mut cx, |this, cx| {
 797                let task = this.update_channels(
 798                    proto::UpdateChannels {
 799                        channels: vec![channel],
 800                        ..Default::default()
 801                    },
 802                    cx,
 803                );
 804                assert!(task.is_none());
 805
 806                // This event is emitted because the collab panel wants to clear the pending edit state
 807                // before this frame is rendered. But we can't guarantee that the collab panel's future
 808                // will resolve before this flush_effects finishes. Synchronously emitting this event
 809                // ensures that the collab panel will observe this creation before the frame complete
 810                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 811            })?;
 812            Ok(())
 813        })
 814    }
 815
 816    pub fn respond_to_channel_invite(
 817        &mut self,
 818        channel_id: ChannelId,
 819        accept: bool,
 820        cx: &mut ModelContext<Self>,
 821    ) -> Task<Result<()>> {
 822        let client = self.client.clone();
 823        cx.background_executor().spawn(async move {
 824            client
 825                .request(proto::RespondToChannelInvite {
 826                    channel_id: channel_id.0,
 827                    accept,
 828                })
 829                .await?;
 830            Ok(())
 831        })
 832    }
 833    pub fn fuzzy_search_members(
 834        &self,
 835        channel_id: ChannelId,
 836        query: String,
 837        limit: u16,
 838        cx: &mut ModelContext<Self>,
 839    ) -> Task<Result<Vec<ChannelMembership>>> {
 840        let client = self.client.clone();
 841        let user_store = self.user_store.downgrade();
 842        cx.spawn(move |_, mut cx| async move {
 843            let response = client
 844                .request(proto::GetChannelMembers {
 845                    channel_id: channel_id.0,
 846                    query,
 847                    limit: limit as u64,
 848                })
 849                .await?;
 850            user_store.update(&mut cx, |user_store, _| {
 851                user_store.insert(response.users);
 852                response
 853                    .members
 854                    .into_iter()
 855                    .filter_map(|member| {
 856                        Some(ChannelMembership {
 857                            user: user_store.get_cached_user(member.user_id)?,
 858                            role: member.role(),
 859                            kind: member.kind(),
 860                        })
 861                    })
 862                    .collect()
 863            })
 864        })
 865    }
 866
 867    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 868        let client = self.client.clone();
 869        async move {
 870            client
 871                .request(proto::DeleteChannel {
 872                    channel_id: channel_id.0,
 873                })
 874                .await?;
 875            Ok(())
 876        }
 877    }
 878
 879    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 880        false
 881    }
 882
 883    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 884        self.outgoing_invites.contains(&(channel_id, user_id))
 885    }
 886
 887    async fn handle_update_channels(
 888        this: Model<Self>,
 889        message: TypedEnvelope<proto::UpdateChannels>,
 890        mut cx: AsyncAppContext,
 891    ) -> Result<()> {
 892        this.update(&mut cx, |this, _| {
 893            this.update_channels_tx
 894                .unbounded_send(message.payload)
 895                .unwrap();
 896        })?;
 897        Ok(())
 898    }
 899
 900    async fn handle_update_user_channels(
 901        this: Model<Self>,
 902        message: TypedEnvelope<proto::UpdateUserChannels>,
 903        mut cx: AsyncAppContext,
 904    ) -> Result<()> {
 905        this.update(&mut cx, |this, cx| {
 906            for buffer_version in message.payload.observed_channel_buffer_version {
 907                let version = language::proto::deserialize_version(&buffer_version.version);
 908                this.acknowledge_notes_version(
 909                    ChannelId(buffer_version.channel_id),
 910                    buffer_version.epoch,
 911                    &version,
 912                    cx,
 913                );
 914            }
 915            for message_id in message.payload.observed_channel_message_id {
 916                this.acknowledge_message_id(
 917                    ChannelId(message_id.channel_id),
 918                    message_id.message_id,
 919                    cx,
 920                );
 921            }
 922            for membership in message.payload.channel_memberships {
 923                if let Some(role) = ChannelRole::from_i32(membership.role) {
 924                    this.channel_states
 925                        .entry(ChannelId(membership.channel_id))
 926                        .or_default()
 927                        .set_role(role)
 928                }
 929            }
 930        })
 931    }
 932
 933    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 934        self.channel_index.clear();
 935        self.channel_invitations.clear();
 936        self.channel_participants.clear();
 937        self.channel_index.clear();
 938        self.outgoing_invites.clear();
 939        self.disconnect_channel_buffers_task.take();
 940
 941        for chat in self.opened_chats.values() {
 942            if let OpenedModelHandle::Open(chat) = chat {
 943                if let Some(chat) = chat.upgrade() {
 944                    chat.update(cx, |chat, cx| {
 945                        chat.rejoin(cx);
 946                    });
 947                }
 948            }
 949        }
 950
 951        let mut buffer_versions = Vec::new();
 952        for buffer in self.opened_buffers.values() {
 953            if let OpenedModelHandle::Open(buffer) = buffer {
 954                if let Some(buffer) = buffer.upgrade() {
 955                    let channel_buffer = buffer.read(cx);
 956                    let buffer = channel_buffer.buffer().read(cx);
 957                    buffer_versions.push(proto::ChannelBufferVersion {
 958                        channel_id: channel_buffer.channel_id.0,
 959                        epoch: channel_buffer.epoch(),
 960                        version: language::proto::serialize_version(&buffer.version()),
 961                    });
 962                }
 963            }
 964        }
 965
 966        if buffer_versions.is_empty() {
 967            return Task::ready(Ok(()));
 968        }
 969
 970        let response = self.client.request(proto::RejoinChannelBuffers {
 971            buffers: buffer_versions,
 972        });
 973
 974        cx.spawn(|this, mut cx| async move {
 975            let mut response = response.await?;
 976
 977            this.update(&mut cx, |this, cx| {
 978                this.opened_buffers.retain(|_, buffer| match buffer {
 979                    OpenedModelHandle::Open(channel_buffer) => {
 980                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 981                            return false;
 982                        };
 983
 984                        channel_buffer.update(cx, |channel_buffer, cx| {
 985                            let channel_id = channel_buffer.channel_id;
 986                            if let Some(remote_buffer) = response
 987                                .buffers
 988                                .iter_mut()
 989                                .find(|buffer| buffer.channel_id == channel_id.0)
 990                            {
 991                                let channel_id = channel_buffer.channel_id;
 992                                let remote_version =
 993                                    language::proto::deserialize_version(&remote_buffer.version);
 994
 995                                channel_buffer.replace_collaborators(
 996                                    mem::take(&mut remote_buffer.collaborators),
 997                                    cx,
 998                                );
 999
1000                                let operations = channel_buffer
1001                                    .buffer()
1002                                    .update(cx, |buffer, cx| {
1003                                        let outgoing_operations =
1004                                            buffer.serialize_ops(Some(remote_version), cx);
1005                                        let incoming_operations =
1006                                            mem::take(&mut remote_buffer.operations)
1007                                                .into_iter()
1008                                                .map(language::proto::deserialize_operation)
1009                                                .collect::<Result<Vec<_>>>()?;
1010                                        buffer.apply_ops(incoming_operations, cx);
1011                                        anyhow::Ok(outgoing_operations)
1012                                    })
1013                                    .log_err();
1014
1015                                if let Some(operations) = operations {
1016                                    let client = this.client.clone();
1017                                    cx.background_executor()
1018                                        .spawn(async move {
1019                                            let operations = operations.await;
1020                                            for chunk in
1021                                                language::proto::split_operations(operations)
1022                                            {
1023                                                client
1024                                                    .send(proto::UpdateChannelBuffer {
1025                                                        channel_id: channel_id.0,
1026                                                        operations: chunk,
1027                                                    })
1028                                                    .ok();
1029                                            }
1030                                        })
1031                                        .detach();
1032                                    return true;
1033                                }
1034                            }
1035
1036                            channel_buffer.disconnect(cx);
1037                            false
1038                        })
1039                    }
1040                    OpenedModelHandle::Loading(_) => true,
1041                });
1042            })
1043            .ok();
1044            anyhow::Ok(())
1045        })
1046    }
1047
1048    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1049        cx.notify();
1050        self.did_subscribe = false;
1051        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1052            cx.spawn(move |this, mut cx| async move {
1053                if wait_for_reconnect {
1054                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1055                }
1056
1057                if let Some(this) = this.upgrade() {
1058                    this.update(&mut cx, |this, cx| {
1059                        for (_, buffer) in this.opened_buffers.drain() {
1060                            if let OpenedModelHandle::Open(buffer) = buffer {
1061                                if let Some(buffer) = buffer.upgrade() {
1062                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1063                                }
1064                            }
1065                        }
1066                    })
1067                    .ok();
1068                }
1069            })
1070        });
1071    }
1072
1073    pub(crate) fn update_channels(
1074        &mut self,
1075        payload: proto::UpdateChannels,
1076        cx: &mut ModelContext<ChannelStore>,
1077    ) -> Option<Task<Result<()>>> {
1078        if !payload.remove_channel_invitations.is_empty() {
1079            self.channel_invitations
1080                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1081        }
1082        for channel in payload.channel_invitations {
1083            match self
1084                .channel_invitations
1085                .binary_search_by_key(&channel.id, |c| c.id.0)
1086            {
1087                Ok(ix) => {
1088                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1089                }
1090                Err(ix) => self.channel_invitations.insert(
1091                    ix,
1092                    Arc::new(Channel {
1093                        id: ChannelId(channel.id),
1094                        visibility: channel.visibility(),
1095                        name: channel.name.into(),
1096                        parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1097                    }),
1098                ),
1099            }
1100        }
1101
1102        let channels_changed = !payload.channels.is_empty()
1103            || !payload.delete_channels.is_empty()
1104            || !payload.latest_channel_message_ids.is_empty()
1105            || !payload.latest_channel_buffer_versions.is_empty()
1106            || !payload.hosted_projects.is_empty()
1107            || !payload.deleted_hosted_projects.is_empty();
1108
1109        if channels_changed {
1110            if !payload.delete_channels.is_empty() {
1111                let delete_channels: Vec<ChannelId> =
1112                    payload.delete_channels.into_iter().map(ChannelId).collect();
1113                self.channel_index.delete_channels(&delete_channels);
1114                self.channel_participants
1115                    .retain(|channel_id, _| !delete_channels.contains(channel_id));
1116
1117                for channel_id in &delete_channels {
1118                    let channel_id = *channel_id;
1119                    if payload
1120                        .channels
1121                        .iter()
1122                        .any(|channel| channel.id == channel_id.0)
1123                    {
1124                        continue;
1125                    }
1126                    if let Some(OpenedModelHandle::Open(buffer)) =
1127                        self.opened_buffers.remove(&channel_id)
1128                    {
1129                        if let Some(buffer) = buffer.upgrade() {
1130                            buffer.update(cx, ChannelBuffer::disconnect);
1131                        }
1132                    }
1133                }
1134            }
1135
1136            let mut index = self.channel_index.bulk_insert();
1137            for channel in payload.channels {
1138                let id = ChannelId(channel.id);
1139                let channel_changed = index.insert(channel);
1140
1141                if channel_changed {
1142                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1143                        if let Some(buffer) = buffer.upgrade() {
1144                            buffer.update(cx, ChannelBuffer::channel_changed);
1145                        }
1146                    }
1147                }
1148            }
1149
1150            for latest_buffer_version in payload.latest_channel_buffer_versions {
1151                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1152                self.channel_states
1153                    .entry(ChannelId(latest_buffer_version.channel_id))
1154                    .or_default()
1155                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1156            }
1157
1158            for latest_channel_message in payload.latest_channel_message_ids {
1159                self.channel_states
1160                    .entry(ChannelId(latest_channel_message.channel_id))
1161                    .or_default()
1162                    .update_latest_message_id(latest_channel_message.message_id);
1163            }
1164
1165            for hosted_project in payload.hosted_projects {
1166                let hosted_project: HostedProject = hosted_project.into();
1167                if let Some(old_project) = self
1168                    .hosted_projects
1169                    .insert(hosted_project.project_id, hosted_project.clone())
1170                {
1171                    self.channel_states
1172                        .entry(old_project.channel_id)
1173                        .or_default()
1174                        .remove_hosted_project(old_project.project_id);
1175                }
1176                self.channel_states
1177                    .entry(hosted_project.channel_id)
1178                    .or_default()
1179                    .add_hosted_project(hosted_project.project_id);
1180            }
1181
1182            for hosted_project_id in payload.deleted_hosted_projects {
1183                let hosted_project_id = ProjectId(hosted_project_id);
1184
1185                if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1186                    self.channel_states
1187                        .entry(old_project.channel_id)
1188                        .or_default()
1189                        .remove_hosted_project(old_project.project_id);
1190                }
1191            }
1192        }
1193
1194        cx.notify();
1195        if payload.channel_participants.is_empty() {
1196            return None;
1197        }
1198
1199        let mut all_user_ids = Vec::new();
1200        let channel_participants = payload.channel_participants;
1201        for entry in &channel_participants {
1202            for user_id in entry.participant_user_ids.iter() {
1203                if let Err(ix) = all_user_ids.binary_search(user_id) {
1204                    all_user_ids.insert(ix, *user_id);
1205                }
1206            }
1207        }
1208
1209        let users = self
1210            .user_store
1211            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1212        Some(cx.spawn(|this, mut cx| async move {
1213            let users = users.await?;
1214
1215            this.update(&mut cx, |this, cx| {
1216                for entry in &channel_participants {
1217                    let mut participants: Vec<_> = entry
1218                        .participant_user_ids
1219                        .iter()
1220                        .filter_map(|user_id| {
1221                            users
1222                                .binary_search_by_key(&user_id, |user| &user.id)
1223                                .ok()
1224                                .map(|ix| users[ix].clone())
1225                        })
1226                        .collect();
1227
1228                    participants.sort_by_key(|u| u.id);
1229
1230                    this.channel_participants
1231                        .insert(ChannelId(entry.channel_id), participants);
1232                }
1233
1234                cx.notify();
1235            })
1236        }))
1237    }
1238}
1239
1240impl ChannelState {
1241    fn set_role(&mut self, role: ChannelRole) {
1242        self.role = Some(role);
1243    }
1244
1245    fn has_channel_buffer_changed(&self) -> bool {
1246        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1247            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1248                && self
1249                    .latest_notes_version
1250                    .version
1251                    .changed_since(&self.observed_notes_version.version))
1252    }
1253
1254    fn has_new_messages(&self) -> bool {
1255        let latest_message_id = self.latest_chat_message;
1256        let observed_message_id = self.observed_chat_message;
1257
1258        latest_message_id.is_some_and(|latest_message_id| {
1259            latest_message_id > observed_message_id.unwrap_or_default()
1260        })
1261    }
1262
1263    fn last_acknowledged_message_id(&self) -> Option<u64> {
1264        self.observed_chat_message
1265    }
1266
1267    fn acknowledge_message_id(&mut self, message_id: u64) {
1268        let observed = self.observed_chat_message.get_or_insert(message_id);
1269        *observed = (*observed).max(message_id);
1270    }
1271
1272    fn update_latest_message_id(&mut self, message_id: u64) {
1273        self.latest_chat_message =
1274            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1275    }
1276
1277    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1278        if self.observed_notes_version.epoch == epoch {
1279            self.observed_notes_version.version.join(version);
1280        } else {
1281            self.observed_notes_version = NotesVersion {
1282                epoch,
1283                version: version.clone(),
1284            };
1285        }
1286    }
1287
1288    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1289        if self.latest_notes_version.epoch == epoch {
1290            self.latest_notes_version.version.join(version);
1291        } else {
1292            self.latest_notes_version = NotesVersion {
1293                epoch,
1294                version: version.clone(),
1295            };
1296        }
1297    }
1298
1299    fn add_hosted_project(&mut self, project_id: ProjectId) {
1300        self.projects.insert(project_id);
1301    }
1302
1303    fn remove_hosted_project(&mut self, project_id: ProjectId) {
1304        self.projects.remove(&project_id);
1305    }
1306}