channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use rpc::{
  15    proto::{self, ChannelRole, ChannelVisibility},
  16    TypedEnvelope,
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{maybe, ResultExt};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  25    let channel_store =
  26        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36#[derive(Debug, Clone)]
  37pub struct HostedProject {
  38    project_id: ProjectId,
  39    channel_id: ChannelId,
  40    name: SharedString,
  41    _visibility: proto::ChannelVisibility,
  42}
  43impl From<proto::HostedProject> for HostedProject {
  44    fn from(project: proto::HostedProject) -> Self {
  45        Self {
  46            project_id: ProjectId(project.project_id),
  47            channel_id: ChannelId(project.channel_id),
  48            _visibility: project.visibility(),
  49            name: project.name.into(),
  50        }
  51    }
  52}
  53pub struct ChannelStore {
  54    pub channel_index: ChannelIndex,
  55    channel_invitations: Vec<Arc<Channel>>,
  56    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  57    channel_states: HashMap<ChannelId, ChannelState>,
  58    hosted_projects: HashMap<ProjectId, HostedProject>,
  59
  60    outgoing_invites: HashSet<(ChannelId, UserId)>,
  61    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  62    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  63    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  64    client: Arc<Client>,
  65    did_subscribe: bool,
  66    user_store: Model<UserStore>,
  67    _rpc_subscriptions: [Subscription; 2],
  68    _watch_connection_status: Task<Option<()>>,
  69    disconnect_channel_buffers_task: Option<Task<()>>,
  70    _update_channels: Task<()>,
  71}
  72
  73#[derive(Clone, Debug)]
  74pub struct Channel {
  75    pub id: ChannelId,
  76    pub name: SharedString,
  77    pub visibility: proto::ChannelVisibility,
  78    pub parent_path: Vec<ChannelId>,
  79}
  80
  81#[derive(Default, Debug)]
  82pub struct ChannelState {
  83    latest_chat_message: Option<u64>,
  84    latest_notes_version: NotesVersion,
  85    observed_notes_version: NotesVersion,
  86    observed_chat_message: Option<u64>,
  87    role: Option<ChannelRole>,
  88    projects: HashSet<ProjectId>,
  89}
  90
  91impl Channel {
  92    pub fn link(&self, cx: &AppContext) -> String {
  93        format!(
  94            "{}/channel/{}-{}",
  95            ClientSettings::get_global(cx).server_url,
  96            Self::slug(&self.name),
  97            self.id
  98        )
  99    }
 100
 101    pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
 102        self.link(cx)
 103            + "/notes"
 104            + &heading
 105                .map(|h| format!("#{}", Self::slug(&h)))
 106                .unwrap_or_default()
 107    }
 108
 109    pub fn is_root_channel(&self) -> bool {
 110        self.parent_path.is_empty()
 111    }
 112
 113    pub fn root_id(&self) -> ChannelId {
 114        self.parent_path.first().copied().unwrap_or(self.id)
 115    }
 116
 117    pub fn slug(str: &str) -> String {
 118        let slug: String = str
 119            .chars()
 120            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 121            .collect();
 122
 123        slug.trim_matches(|c| c == '-').to_string()
 124    }
 125}
 126
 127#[derive(Debug)]
 128pub struct ChannelMembership {
 129    pub user: Arc<User>,
 130    pub kind: proto::channel_member::Kind,
 131    pub role: proto::ChannelRole,
 132}
 133impl ChannelMembership {
 134    pub fn sort_key(&self) -> MembershipSortKey {
 135        MembershipSortKey {
 136            role_order: match self.role {
 137                proto::ChannelRole::Admin => 0,
 138                proto::ChannelRole::Member => 1,
 139                proto::ChannelRole::Banned => 2,
 140                proto::ChannelRole::Talker => 3,
 141                proto::ChannelRole::Guest => 4,
 142            },
 143            kind_order: match self.kind {
 144                proto::channel_member::Kind::Member => 0,
 145                proto::channel_member::Kind::Invitee => 1,
 146            },
 147            username_order: self.user.github_login.as_str(),
 148        }
 149    }
 150}
 151
 152#[derive(PartialOrd, Ord, PartialEq, Eq)]
 153pub struct MembershipSortKey<'a> {
 154    role_order: u8,
 155    kind_order: u8,
 156    username_order: &'a str,
 157}
 158
 159pub enum ChannelEvent {
 160    ChannelCreated(ChannelId),
 161    ChannelRenamed(ChannelId),
 162}
 163
 164impl EventEmitter<ChannelEvent> for ChannelStore {}
 165
 166enum OpenedModelHandle<E> {
 167    Open(WeakModel<E>),
 168    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 169}
 170
 171struct GlobalChannelStore(Model<ChannelStore>);
 172
 173impl Global for GlobalChannelStore {}
 174
 175impl ChannelStore {
 176    pub fn global(cx: &AppContext) -> Model<Self> {
 177        cx.global::<GlobalChannelStore>().0.clone()
 178    }
 179
 180    pub fn new(
 181        client: Arc<Client>,
 182        user_store: Model<UserStore>,
 183        cx: &mut ModelContext<Self>,
 184    ) -> Self {
 185        let rpc_subscriptions = [
 186            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 187            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 188        ];
 189
 190        let mut connection_status = client.status();
 191        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 192        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 193            while let Some(status) = connection_status.next().await {
 194                let this = this.upgrade()?;
 195                match status {
 196                    client::Status::Connected { .. } => {
 197                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 198                            .ok()?
 199                            .await
 200                            .log_err()?;
 201                    }
 202                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 203                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 204                            .ok();
 205                    }
 206                    _ => {
 207                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 208                            .ok();
 209                    }
 210                }
 211            }
 212            Some(())
 213        });
 214
 215        Self {
 216            channel_invitations: Vec::default(),
 217            channel_index: ChannelIndex::default(),
 218            channel_participants: Default::default(),
 219            hosted_projects: Default::default(),
 220            outgoing_invites: Default::default(),
 221            opened_buffers: Default::default(),
 222            opened_chats: Default::default(),
 223            update_channels_tx,
 224            client,
 225            user_store,
 226            _rpc_subscriptions: rpc_subscriptions,
 227            _watch_connection_status: watch_connection_status,
 228            disconnect_channel_buffers_task: None,
 229            _update_channels: cx.spawn(|this, mut cx| async move {
 230                maybe!(async move {
 231                    while let Some(update_channels) = update_channels_rx.next().await {
 232                        if let Some(this) = this.upgrade() {
 233                            let update_task = this.update(&mut cx, |this, cx| {
 234                                this.update_channels(update_channels, cx)
 235                            })?;
 236                            if let Some(update_task) = update_task {
 237                                update_task.await.log_err();
 238                            }
 239                        }
 240                    }
 241                    anyhow::Ok(())
 242                })
 243                .await
 244                .log_err();
 245            }),
 246            channel_states: Default::default(),
 247            did_subscribe: false,
 248        }
 249    }
 250
 251    pub fn initialize(&mut self) {
 252        if !self.did_subscribe {
 253            if self
 254                .client
 255                .send(proto::SubscribeToChannels {})
 256                .log_err()
 257                .is_some()
 258            {
 259                self.did_subscribe = true;
 260            }
 261        }
 262    }
 263
 264    pub fn client(&self) -> Arc<Client> {
 265        self.client.clone()
 266    }
 267
 268    /// Returns the number of unique channels in the store
 269    pub fn channel_count(&self) -> usize {
 270        self.channel_index.by_id().len()
 271    }
 272
 273    /// Returns the index of a channel ID in the list of unique channels
 274    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 275        self.channel_index
 276            .by_id()
 277            .keys()
 278            .position(|id| *id == channel_id)
 279    }
 280
 281    /// Returns an iterator over all unique channels
 282    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 283        self.channel_index.by_id().values()
 284    }
 285
 286    /// Iterate over all entries in the channel DAG
 287    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 288        self.channel_index
 289            .ordered_channels()
 290            .iter()
 291            .filter_map(move |id| {
 292                let channel = self.channel_index.by_id().get(id)?;
 293                Some((channel.parent_path.len(), channel))
 294            })
 295    }
 296
 297    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 298        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 299        self.channel_index.by_id().get(channel_id)
 300    }
 301
 302    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 303        self.channel_index.by_id().values().nth(ix)
 304    }
 305
 306    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 307        self.channel_invitations
 308            .iter()
 309            .any(|channel| channel.id == channel_id)
 310    }
 311
 312    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 313        &self.channel_invitations
 314    }
 315
 316    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 317        self.channel_index.by_id().get(&channel_id)
 318    }
 319
 320    pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
 321        let mut projects: Vec<(SharedString, ProjectId)> = self
 322            .channel_states
 323            .get(&channel_id)
 324            .map(|state| state.projects.clone())
 325            .unwrap_or_default()
 326            .into_iter()
 327            .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
 328            .collect();
 329        projects.sort();
 330        projects
 331    }
 332
 333    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 334        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 335            if let OpenedModelHandle::Open(buffer) = buffer {
 336                return buffer.upgrade().is_some();
 337            }
 338        }
 339        false
 340    }
 341
 342    pub fn open_channel_buffer(
 343        &mut self,
 344        channel_id: ChannelId,
 345        cx: &mut ModelContext<Self>,
 346    ) -> Task<Result<Model<ChannelBuffer>>> {
 347        let client = self.client.clone();
 348        let user_store = self.user_store.clone();
 349        let channel_store = cx.handle();
 350        self.open_channel_resource(
 351            channel_id,
 352            |this| &mut this.opened_buffers,
 353            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 354            cx,
 355        )
 356    }
 357
 358    pub fn fetch_channel_messages(
 359        &self,
 360        message_ids: Vec<u64>,
 361        cx: &mut ModelContext<Self>,
 362    ) -> Task<Result<Vec<ChannelMessage>>> {
 363        let request = if message_ids.is_empty() {
 364            None
 365        } else {
 366            Some(
 367                self.client
 368                    .request(proto::GetChannelMessagesById { message_ids }),
 369            )
 370        };
 371        cx.spawn(|this, mut cx| async move {
 372            if let Some(request) = request {
 373                let response = request.await?;
 374                let this = this
 375                    .upgrade()
 376                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 377                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 378                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 379            } else {
 380                Ok(Vec::new())
 381            }
 382        })
 383    }
 384
 385    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 386        self.channel_states
 387            .get(&channel_id)
 388            .is_some_and(|state| state.has_channel_buffer_changed())
 389    }
 390
 391    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 392        self.channel_states
 393            .get(&channel_id)
 394            .is_some_and(|state| state.has_new_messages())
 395    }
 396
 397    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 398        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 399            state.latest_chat_message = message_id;
 400        }
 401    }
 402
 403    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 404        self.channel_states.get(&channel_id).and_then(|state| {
 405            if let Some(last_message_id) = state.latest_chat_message {
 406                if state
 407                    .last_acknowledged_message_id()
 408                    .is_some_and(|id| id < last_message_id)
 409                {
 410                    return state.last_acknowledged_message_id();
 411                }
 412            }
 413
 414            None
 415        })
 416    }
 417
 418    pub fn acknowledge_message_id(
 419        &mut self,
 420        channel_id: ChannelId,
 421        message_id: u64,
 422        cx: &mut ModelContext<Self>,
 423    ) {
 424        self.channel_states
 425            .entry(channel_id)
 426            .or_insert_with(|| Default::default())
 427            .acknowledge_message_id(message_id);
 428        cx.notify();
 429    }
 430
 431    pub fn update_latest_message_id(
 432        &mut self,
 433        channel_id: ChannelId,
 434        message_id: u64,
 435        cx: &mut ModelContext<Self>,
 436    ) {
 437        self.channel_states
 438            .entry(channel_id)
 439            .or_insert_with(|| Default::default())
 440            .update_latest_message_id(message_id);
 441        cx.notify();
 442    }
 443
 444    pub fn acknowledge_notes_version(
 445        &mut self,
 446        channel_id: ChannelId,
 447        epoch: u64,
 448        version: &clock::Global,
 449        cx: &mut ModelContext<Self>,
 450    ) {
 451        self.channel_states
 452            .entry(channel_id)
 453            .or_insert_with(|| Default::default())
 454            .acknowledge_notes_version(epoch, version);
 455        cx.notify()
 456    }
 457
 458    pub fn update_latest_notes_version(
 459        &mut self,
 460        channel_id: ChannelId,
 461        epoch: u64,
 462        version: &clock::Global,
 463        cx: &mut ModelContext<Self>,
 464    ) {
 465        self.channel_states
 466            .entry(channel_id)
 467            .or_insert_with(|| Default::default())
 468            .update_latest_notes_version(epoch, version);
 469        cx.notify()
 470    }
 471
 472    pub fn open_channel_chat(
 473        &mut self,
 474        channel_id: ChannelId,
 475        cx: &mut ModelContext<Self>,
 476    ) -> Task<Result<Model<ChannelChat>>> {
 477        let client = self.client.clone();
 478        let user_store = self.user_store.clone();
 479        let this = cx.handle();
 480        self.open_channel_resource(
 481            channel_id,
 482            |this| &mut this.opened_chats,
 483            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 484            cx,
 485        )
 486    }
 487
 488    /// Asynchronously open a given resource associated with a channel.
 489    ///
 490    /// Make sure that the resource is only opened once, even if this method
 491    /// is called multiple times with the same channel id while the first task
 492    /// is still running.
 493    fn open_channel_resource<T, F, Fut>(
 494        &mut self,
 495        channel_id: ChannelId,
 496        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 497        load: F,
 498        cx: &mut ModelContext<Self>,
 499    ) -> Task<Result<Model<T>>>
 500    where
 501        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 502        Fut: Future<Output = Result<Model<T>>>,
 503        T: 'static,
 504    {
 505        let task = loop {
 506            match get_map(self).entry(channel_id) {
 507                hash_map::Entry::Occupied(e) => match e.get() {
 508                    OpenedModelHandle::Open(model) => {
 509                        if let Some(model) = model.upgrade() {
 510                            break Task::ready(Ok(model)).shared();
 511                        } else {
 512                            get_map(self).remove(&channel_id);
 513                            continue;
 514                        }
 515                    }
 516                    OpenedModelHandle::Loading(task) => {
 517                        break task.clone();
 518                    }
 519                },
 520                hash_map::Entry::Vacant(e) => {
 521                    let task = cx
 522                        .spawn(move |this, mut cx| async move {
 523                            let channel = this.update(&mut cx, |this, _| {
 524                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 525                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 526                                })
 527                            })??;
 528
 529                            load(channel, cx).await.map_err(Arc::new)
 530                        })
 531                        .shared();
 532
 533                    e.insert(OpenedModelHandle::Loading(task.clone()));
 534                    cx.spawn({
 535                        let task = task.clone();
 536                        move |this, mut cx| async move {
 537                            let result = task.await;
 538                            this.update(&mut cx, |this, _| match result {
 539                                Ok(model) => {
 540                                    get_map(this).insert(
 541                                        channel_id,
 542                                        OpenedModelHandle::Open(model.downgrade()),
 543                                    );
 544                                }
 545                                Err(_) => {
 546                                    get_map(this).remove(&channel_id);
 547                                }
 548                            })
 549                            .ok();
 550                        }
 551                    })
 552                    .detach();
 553                    break task;
 554                }
 555            }
 556        };
 557        cx.background_executor()
 558            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 559    }
 560
 561    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 562        self.channel_role(channel_id) == proto::ChannelRole::Admin
 563    }
 564
 565    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 566        self.channel_index
 567            .by_id()
 568            .get(&channel_id)
 569            .map_or(false, |channel| channel.is_root_channel())
 570    }
 571
 572    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 573        self.channel_index
 574            .by_id()
 575            .get(&channel_id)
 576            .map_or(false, |channel| {
 577                channel.visibility == ChannelVisibility::Public
 578            })
 579    }
 580
 581    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 582        match self.channel_role(channel_id) {
 583            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 584            _ => Capability::ReadOnly,
 585        }
 586    }
 587
 588    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 589        maybe!({
 590            let mut channel = self.channel_for_id(channel_id)?;
 591            if !channel.is_root_channel() {
 592                channel = self.channel_for_id(channel.root_id())?;
 593            }
 594            let root_channel_state = self.channel_states.get(&channel.id);
 595            root_channel_state?.role
 596        })
 597        .unwrap_or(proto::ChannelRole::Guest)
 598    }
 599
 600    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 601        self.channel_participants
 602            .get(&channel_id)
 603            .map_or(&[], |v| v.as_slice())
 604    }
 605
 606    pub fn create_channel(
 607        &self,
 608        name: &str,
 609        parent_id: Option<ChannelId>,
 610        cx: &mut ModelContext<Self>,
 611    ) -> Task<Result<ChannelId>> {
 612        let client = self.client.clone();
 613        let name = name.trim_start_matches('#').to_owned();
 614        cx.spawn(move |this, mut cx| async move {
 615            let response = client
 616                .request(proto::CreateChannel {
 617                    name,
 618                    parent_id: parent_id.map(|cid| cid.0),
 619                })
 620                .await?;
 621
 622            let channel = response
 623                .channel
 624                .ok_or_else(|| anyhow!("missing channel in response"))?;
 625            let channel_id = ChannelId(channel.id);
 626
 627            this.update(&mut cx, |this, cx| {
 628                let task = this.update_channels(
 629                    proto::UpdateChannels {
 630                        channels: vec![channel],
 631                        ..Default::default()
 632                    },
 633                    cx,
 634                );
 635                assert!(task.is_none());
 636
 637                // This event is emitted because the collab panel wants to clear the pending edit state
 638                // before this frame is rendered. But we can't guarantee that the collab panel's future
 639                // will resolve before this flush_effects finishes. Synchronously emitting this event
 640                // ensures that the collab panel will observe this creation before the frame completes
 641                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 642            })?;
 643
 644            Ok(channel_id)
 645        })
 646    }
 647
 648    pub fn move_channel(
 649        &mut self,
 650        channel_id: ChannelId,
 651        to: ChannelId,
 652        cx: &mut ModelContext<Self>,
 653    ) -> Task<Result<()>> {
 654        let client = self.client.clone();
 655        cx.spawn(move |_, _| async move {
 656            let _ = client
 657                .request(proto::MoveChannel {
 658                    channel_id: channel_id.0,
 659                    to: to.0,
 660                })
 661                .await?;
 662
 663            Ok(())
 664        })
 665    }
 666
 667    pub fn set_channel_visibility(
 668        &mut self,
 669        channel_id: ChannelId,
 670        visibility: ChannelVisibility,
 671        cx: &mut ModelContext<Self>,
 672    ) -> Task<Result<()>> {
 673        let client = self.client.clone();
 674        cx.spawn(move |_, _| async move {
 675            let _ = client
 676                .request(proto::SetChannelVisibility {
 677                    channel_id: channel_id.0,
 678                    visibility: visibility.into(),
 679                })
 680                .await?;
 681
 682            Ok(())
 683        })
 684    }
 685
 686    pub fn invite_member(
 687        &mut self,
 688        channel_id: ChannelId,
 689        user_id: UserId,
 690        role: proto::ChannelRole,
 691        cx: &mut ModelContext<Self>,
 692    ) -> Task<Result<()>> {
 693        if !self.outgoing_invites.insert((channel_id, user_id)) {
 694            return Task::ready(Err(anyhow!("invite request already in progress")));
 695        }
 696
 697        cx.notify();
 698        let client = self.client.clone();
 699        cx.spawn(move |this, mut cx| async move {
 700            let result = client
 701                .request(proto::InviteChannelMember {
 702                    channel_id: channel_id.0,
 703                    user_id,
 704                    role: role.into(),
 705                })
 706                .await;
 707
 708            this.update(&mut cx, |this, cx| {
 709                this.outgoing_invites.remove(&(channel_id, user_id));
 710                cx.notify();
 711            })?;
 712
 713            result?;
 714
 715            Ok(())
 716        })
 717    }
 718
 719    pub fn remove_member(
 720        &mut self,
 721        channel_id: ChannelId,
 722        user_id: u64,
 723        cx: &mut ModelContext<Self>,
 724    ) -> Task<Result<()>> {
 725        if !self.outgoing_invites.insert((channel_id, user_id)) {
 726            return Task::ready(Err(anyhow!("invite request already in progress")));
 727        }
 728
 729        cx.notify();
 730        let client = self.client.clone();
 731        cx.spawn(move |this, mut cx| async move {
 732            let result = client
 733                .request(proto::RemoveChannelMember {
 734                    channel_id: channel_id.0,
 735                    user_id,
 736                })
 737                .await;
 738
 739            this.update(&mut cx, |this, cx| {
 740                this.outgoing_invites.remove(&(channel_id, user_id));
 741                cx.notify();
 742            })?;
 743            result?;
 744            Ok(())
 745        })
 746    }
 747
 748    pub fn set_member_role(
 749        &mut self,
 750        channel_id: ChannelId,
 751        user_id: UserId,
 752        role: proto::ChannelRole,
 753        cx: &mut ModelContext<Self>,
 754    ) -> Task<Result<()>> {
 755        if !self.outgoing_invites.insert((channel_id, user_id)) {
 756            return Task::ready(Err(anyhow!("member request already in progress")));
 757        }
 758
 759        cx.notify();
 760        let client = self.client.clone();
 761        cx.spawn(move |this, mut cx| async move {
 762            let result = client
 763                .request(proto::SetChannelMemberRole {
 764                    channel_id: channel_id.0,
 765                    user_id,
 766                    role: role.into(),
 767                })
 768                .await;
 769
 770            this.update(&mut cx, |this, cx| {
 771                this.outgoing_invites.remove(&(channel_id, user_id));
 772                cx.notify();
 773            })?;
 774
 775            result?;
 776            Ok(())
 777        })
 778    }
 779
 780    pub fn rename(
 781        &mut self,
 782        channel_id: ChannelId,
 783        new_name: &str,
 784        cx: &mut ModelContext<Self>,
 785    ) -> Task<Result<()>> {
 786        let client = self.client.clone();
 787        let name = new_name.to_string();
 788        cx.spawn(move |this, mut cx| async move {
 789            let channel = client
 790                .request(proto::RenameChannel {
 791                    channel_id: channel_id.0,
 792                    name,
 793                })
 794                .await?
 795                .channel
 796                .ok_or_else(|| anyhow!("missing channel in response"))?;
 797            this.update(&mut cx, |this, cx| {
 798                let task = this.update_channels(
 799                    proto::UpdateChannels {
 800                        channels: vec![channel],
 801                        ..Default::default()
 802                    },
 803                    cx,
 804                );
 805                assert!(task.is_none());
 806
 807                // This event is emitted because the collab panel wants to clear the pending edit state
 808                // before this frame is rendered. But we can't guarantee that the collab panel's future
 809                // will resolve before this flush_effects finishes. Synchronously emitting this event
 810                // ensures that the collab panel will observe this creation before the frame complete
 811                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 812            })?;
 813            Ok(())
 814        })
 815    }
 816
 817    pub fn respond_to_channel_invite(
 818        &mut self,
 819        channel_id: ChannelId,
 820        accept: bool,
 821        cx: &mut ModelContext<Self>,
 822    ) -> Task<Result<()>> {
 823        let client = self.client.clone();
 824        cx.background_executor().spawn(async move {
 825            client
 826                .request(proto::RespondToChannelInvite {
 827                    channel_id: channel_id.0,
 828                    accept,
 829                })
 830                .await?;
 831            Ok(())
 832        })
 833    }
 834    pub fn fuzzy_search_members(
 835        &self,
 836        channel_id: ChannelId,
 837        query: String,
 838        limit: u16,
 839        cx: &mut ModelContext<Self>,
 840    ) -> Task<Result<Vec<ChannelMembership>>> {
 841        let client = self.client.clone();
 842        let user_store = self.user_store.downgrade();
 843        cx.spawn(move |_, mut cx| async move {
 844            let response = client
 845                .request(proto::GetChannelMembers {
 846                    channel_id: channel_id.0,
 847                    query,
 848                    limit: limit as u64,
 849                })
 850                .await?;
 851            user_store.update(&mut cx, |user_store, _| {
 852                user_store.insert(response.users);
 853                response
 854                    .members
 855                    .into_iter()
 856                    .filter_map(|member| {
 857                        Some(ChannelMembership {
 858                            user: user_store.get_cached_user(member.user_id)?,
 859                            role: member.role(),
 860                            kind: member.kind(),
 861                        })
 862                    })
 863                    .collect()
 864            })
 865        })
 866    }
 867
 868    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 869        let client = self.client.clone();
 870        async move {
 871            client
 872                .request(proto::DeleteChannel {
 873                    channel_id: channel_id.0,
 874                })
 875                .await?;
 876            Ok(())
 877        }
 878    }
 879
 880    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 881        false
 882    }
 883
 884    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 885        self.outgoing_invites.contains(&(channel_id, user_id))
 886    }
 887
 888    async fn handle_update_channels(
 889        this: Model<Self>,
 890        message: TypedEnvelope<proto::UpdateChannels>,
 891        _: Arc<Client>,
 892        mut cx: AsyncAppContext,
 893    ) -> Result<()> {
 894        this.update(&mut cx, |this, _| {
 895            this.update_channels_tx
 896                .unbounded_send(message.payload)
 897                .unwrap();
 898        })?;
 899        Ok(())
 900    }
 901
 902    async fn handle_update_user_channels(
 903        this: Model<Self>,
 904        message: TypedEnvelope<proto::UpdateUserChannels>,
 905        _: Arc<Client>,
 906        mut cx: AsyncAppContext,
 907    ) -> Result<()> {
 908        this.update(&mut cx, |this, cx| {
 909            for buffer_version in message.payload.observed_channel_buffer_version {
 910                let version = language::proto::deserialize_version(&buffer_version.version);
 911                this.acknowledge_notes_version(
 912                    ChannelId(buffer_version.channel_id),
 913                    buffer_version.epoch,
 914                    &version,
 915                    cx,
 916                );
 917            }
 918            for message_id in message.payload.observed_channel_message_id {
 919                this.acknowledge_message_id(
 920                    ChannelId(message_id.channel_id),
 921                    message_id.message_id,
 922                    cx,
 923                );
 924            }
 925            for membership in message.payload.channel_memberships {
 926                if let Some(role) = ChannelRole::from_i32(membership.role) {
 927                    this.channel_states
 928                        .entry(ChannelId(membership.channel_id))
 929                        .or_insert_with(|| ChannelState::default())
 930                        .set_role(role)
 931                }
 932            }
 933        })
 934    }
 935
 936    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 937        self.channel_index.clear();
 938        self.channel_invitations.clear();
 939        self.channel_participants.clear();
 940        self.channel_index.clear();
 941        self.outgoing_invites.clear();
 942        self.disconnect_channel_buffers_task.take();
 943
 944        for chat in self.opened_chats.values() {
 945            if let OpenedModelHandle::Open(chat) = chat {
 946                if let Some(chat) = chat.upgrade() {
 947                    chat.update(cx, |chat, cx| {
 948                        chat.rejoin(cx);
 949                    });
 950                }
 951            }
 952        }
 953
 954        let mut buffer_versions = Vec::new();
 955        for buffer in self.opened_buffers.values() {
 956            if let OpenedModelHandle::Open(buffer) = buffer {
 957                if let Some(buffer) = buffer.upgrade() {
 958                    let channel_buffer = buffer.read(cx);
 959                    let buffer = channel_buffer.buffer().read(cx);
 960                    buffer_versions.push(proto::ChannelBufferVersion {
 961                        channel_id: channel_buffer.channel_id.0,
 962                        epoch: channel_buffer.epoch(),
 963                        version: language::proto::serialize_version(&buffer.version()),
 964                    });
 965                }
 966            }
 967        }
 968
 969        if buffer_versions.is_empty() {
 970            return Task::ready(Ok(()));
 971        }
 972
 973        let response = self.client.request(proto::RejoinChannelBuffers {
 974            buffers: buffer_versions,
 975        });
 976
 977        cx.spawn(|this, mut cx| async move {
 978            let mut response = response.await?;
 979
 980            this.update(&mut cx, |this, cx| {
 981                this.opened_buffers.retain(|_, buffer| match buffer {
 982                    OpenedModelHandle::Open(channel_buffer) => {
 983                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 984                            return false;
 985                        };
 986
 987                        channel_buffer.update(cx, |channel_buffer, cx| {
 988                            let channel_id = channel_buffer.channel_id;
 989                            if let Some(remote_buffer) = response
 990                                .buffers
 991                                .iter_mut()
 992                                .find(|buffer| buffer.channel_id == channel_id.0)
 993                            {
 994                                let channel_id = channel_buffer.channel_id;
 995                                let remote_version =
 996                                    language::proto::deserialize_version(&remote_buffer.version);
 997
 998                                channel_buffer.replace_collaborators(
 999                                    mem::take(&mut remote_buffer.collaborators),
1000                                    cx,
1001                                );
1002
1003                                let operations = channel_buffer
1004                                    .buffer()
1005                                    .update(cx, |buffer, cx| {
1006                                        let outgoing_operations =
1007                                            buffer.serialize_ops(Some(remote_version), cx);
1008                                        let incoming_operations =
1009                                            mem::take(&mut remote_buffer.operations)
1010                                                .into_iter()
1011                                                .map(language::proto::deserialize_operation)
1012                                                .collect::<Result<Vec<_>>>()?;
1013                                        buffer.apply_ops(incoming_operations, cx)?;
1014                                        anyhow::Ok(outgoing_operations)
1015                                    })
1016                                    .log_err();
1017
1018                                if let Some(operations) = operations {
1019                                    let client = this.client.clone();
1020                                    cx.background_executor()
1021                                        .spawn(async move {
1022                                            let operations = operations.await;
1023                                            for chunk in
1024                                                language::proto::split_operations(operations)
1025                                            {
1026                                                client
1027                                                    .send(proto::UpdateChannelBuffer {
1028                                                        channel_id: channel_id.0,
1029                                                        operations: chunk,
1030                                                    })
1031                                                    .ok();
1032                                            }
1033                                        })
1034                                        .detach();
1035                                    return true;
1036                                }
1037                            }
1038
1039                            channel_buffer.disconnect(cx);
1040                            false
1041                        })
1042                    }
1043                    OpenedModelHandle::Loading(_) => true,
1044                });
1045            })
1046            .ok();
1047            anyhow::Ok(())
1048        })
1049    }
1050
1051    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1052        cx.notify();
1053        self.did_subscribe = false;
1054        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1055            cx.spawn(move |this, mut cx| async move {
1056                if wait_for_reconnect {
1057                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1058                }
1059
1060                if let Some(this) = this.upgrade() {
1061                    this.update(&mut cx, |this, cx| {
1062                        for (_, buffer) in this.opened_buffers.drain() {
1063                            if let OpenedModelHandle::Open(buffer) = buffer {
1064                                if let Some(buffer) = buffer.upgrade() {
1065                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1066                                }
1067                            }
1068                        }
1069                    })
1070                    .ok();
1071                }
1072            })
1073        });
1074    }
1075
1076    pub(crate) fn update_channels(
1077        &mut self,
1078        payload: proto::UpdateChannels,
1079        cx: &mut ModelContext<ChannelStore>,
1080    ) -> Option<Task<Result<()>>> {
1081        if !payload.remove_channel_invitations.is_empty() {
1082            self.channel_invitations
1083                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1084        }
1085        for channel in payload.channel_invitations {
1086            match self
1087                .channel_invitations
1088                .binary_search_by_key(&channel.id, |c| c.id.0)
1089            {
1090                Ok(ix) => {
1091                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1092                }
1093                Err(ix) => self.channel_invitations.insert(
1094                    ix,
1095                    Arc::new(Channel {
1096                        id: ChannelId(channel.id),
1097                        visibility: channel.visibility(),
1098                        name: channel.name.into(),
1099                        parent_path: channel
1100                            .parent_path
1101                            .into_iter()
1102                            .map(|cid| ChannelId(cid))
1103                            .collect(),
1104                    }),
1105                ),
1106            }
1107        }
1108
1109        let channels_changed = !payload.channels.is_empty()
1110            || !payload.delete_channels.is_empty()
1111            || !payload.latest_channel_message_ids.is_empty()
1112            || !payload.latest_channel_buffer_versions.is_empty()
1113            || !payload.hosted_projects.is_empty()
1114            || !payload.deleted_hosted_projects.is_empty();
1115
1116        if channels_changed {
1117            if !payload.delete_channels.is_empty() {
1118                let delete_channels: Vec<ChannelId> = payload
1119                    .delete_channels
1120                    .into_iter()
1121                    .map(|cid| ChannelId(cid))
1122                    .collect();
1123                self.channel_index.delete_channels(&delete_channels);
1124                self.channel_participants
1125                    .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1126
1127                for channel_id in &delete_channels {
1128                    let channel_id = *channel_id;
1129                    if payload
1130                        .channels
1131                        .iter()
1132                        .any(|channel| channel.id == channel_id.0)
1133                    {
1134                        continue;
1135                    }
1136                    if let Some(OpenedModelHandle::Open(buffer)) =
1137                        self.opened_buffers.remove(&channel_id)
1138                    {
1139                        if let Some(buffer) = buffer.upgrade() {
1140                            buffer.update(cx, ChannelBuffer::disconnect);
1141                        }
1142                    }
1143                }
1144            }
1145
1146            let mut index = self.channel_index.bulk_insert();
1147            for channel in payload.channels {
1148                let id = ChannelId(channel.id);
1149                let channel_changed = index.insert(channel);
1150
1151                if channel_changed {
1152                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1153                        if let Some(buffer) = buffer.upgrade() {
1154                            buffer.update(cx, ChannelBuffer::channel_changed);
1155                        }
1156                    }
1157                }
1158            }
1159
1160            for latest_buffer_version in payload.latest_channel_buffer_versions {
1161                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1162                self.channel_states
1163                    .entry(ChannelId(latest_buffer_version.channel_id))
1164                    .or_default()
1165                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1166            }
1167
1168            for latest_channel_message in payload.latest_channel_message_ids {
1169                self.channel_states
1170                    .entry(ChannelId(latest_channel_message.channel_id))
1171                    .or_default()
1172                    .update_latest_message_id(latest_channel_message.message_id);
1173            }
1174
1175            for hosted_project in payload.hosted_projects {
1176                let hosted_project: HostedProject = hosted_project.into();
1177                if let Some(old_project) = self
1178                    .hosted_projects
1179                    .insert(hosted_project.project_id, hosted_project.clone())
1180                {
1181                    self.channel_states
1182                        .entry(old_project.channel_id)
1183                        .or_default()
1184                        .remove_hosted_project(old_project.project_id);
1185                }
1186                self.channel_states
1187                    .entry(hosted_project.channel_id)
1188                    .or_default()
1189                    .add_hosted_project(hosted_project.project_id);
1190            }
1191
1192            for hosted_project_id in payload.deleted_hosted_projects {
1193                let hosted_project_id = ProjectId(hosted_project_id);
1194
1195                if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1196                    self.channel_states
1197                        .entry(old_project.channel_id)
1198                        .or_default()
1199                        .remove_hosted_project(old_project.project_id);
1200                }
1201            }
1202        }
1203
1204        cx.notify();
1205        if payload.channel_participants.is_empty() {
1206            return None;
1207        }
1208
1209        let mut all_user_ids = Vec::new();
1210        let channel_participants = payload.channel_participants;
1211        for entry in &channel_participants {
1212            for user_id in entry.participant_user_ids.iter() {
1213                if let Err(ix) = all_user_ids.binary_search(user_id) {
1214                    all_user_ids.insert(ix, *user_id);
1215                }
1216            }
1217        }
1218
1219        let users = self
1220            .user_store
1221            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1222        Some(cx.spawn(|this, mut cx| async move {
1223            let users = users.await?;
1224
1225            this.update(&mut cx, |this, cx| {
1226                for entry in &channel_participants {
1227                    let mut participants: Vec<_> = entry
1228                        .participant_user_ids
1229                        .iter()
1230                        .filter_map(|user_id| {
1231                            users
1232                                .binary_search_by_key(&user_id, |user| &user.id)
1233                                .ok()
1234                                .map(|ix| users[ix].clone())
1235                        })
1236                        .collect();
1237
1238                    participants.sort_by_key(|u| u.id);
1239
1240                    this.channel_participants
1241                        .insert(ChannelId(entry.channel_id), participants);
1242                }
1243
1244                cx.notify();
1245            })
1246        }))
1247    }
1248}
1249
1250impl ChannelState {
1251    fn set_role(&mut self, role: ChannelRole) {
1252        self.role = Some(role);
1253    }
1254
1255    fn has_channel_buffer_changed(&self) -> bool {
1256        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1257            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1258                && self
1259                    .latest_notes_version
1260                    .version
1261                    .changed_since(&self.observed_notes_version.version))
1262    }
1263
1264    fn has_new_messages(&self) -> bool {
1265        let latest_message_id = self.latest_chat_message;
1266        let observed_message_id = self.observed_chat_message;
1267
1268        latest_message_id.is_some_and(|latest_message_id| {
1269            latest_message_id > observed_message_id.unwrap_or_default()
1270        })
1271    }
1272
1273    fn last_acknowledged_message_id(&self) -> Option<u64> {
1274        self.observed_chat_message
1275    }
1276
1277    fn acknowledge_message_id(&mut self, message_id: u64) {
1278        let observed = self.observed_chat_message.get_or_insert(message_id);
1279        *observed = (*observed).max(message_id);
1280    }
1281
1282    fn update_latest_message_id(&mut self, message_id: u64) {
1283        self.latest_chat_message =
1284            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1285    }
1286
1287    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1288        if self.observed_notes_version.epoch == epoch {
1289            self.observed_notes_version.version.join(version);
1290        } else {
1291            self.observed_notes_version = NotesVersion {
1292                epoch,
1293                version: version.clone(),
1294            };
1295        }
1296    }
1297
1298    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1299        if self.latest_notes_version.epoch == epoch {
1300            self.latest_notes_version.version.join(version);
1301        } else {
1302            self.latest_notes_version = NotesVersion {
1303                epoch,
1304                version: version.clone(),
1305            };
1306        }
1307    }
1308
1309    fn add_hosted_project(&mut self, project_id: ProjectId) {
1310        self.projects.insert(project_id);
1311    }
1312
1313    fn remove_hosted_project(&mut self, project_id: ProjectId) {
1314        self.projects.remove(&project_id);
1315    }
1316}