channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use rpc::{
  15    proto::{self, ChannelRole, ChannelVisibility},
  16    TypedEnvelope,
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{maybe, ResultExt};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  25    let channel_store =
  26        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36#[derive(Debug, Clone)]
  37pub struct HostedProject {
  38    project_id: ProjectId,
  39    channel_id: ChannelId,
  40    name: SharedString,
  41    _visibility: proto::ChannelVisibility,
  42}
  43impl From<proto::HostedProject> for HostedProject {
  44    fn from(project: proto::HostedProject) -> Self {
  45        Self {
  46            project_id: ProjectId(project.project_id),
  47            channel_id: ChannelId(project.channel_id),
  48            _visibility: project.visibility(),
  49            name: project.name.into(),
  50        }
  51    }
  52}
  53pub struct ChannelStore {
  54    pub channel_index: ChannelIndex,
  55    channel_invitations: Vec<Arc<Channel>>,
  56    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  57    channel_states: HashMap<ChannelId, ChannelState>,
  58    hosted_projects: HashMap<ProjectId, HostedProject>,
  59
  60    outgoing_invites: HashSet<(ChannelId, UserId)>,
  61    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  62    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  63    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  64    client: Arc<Client>,
  65    did_subscribe: bool,
  66    user_store: Model<UserStore>,
  67    _rpc_subscriptions: [Subscription; 2],
  68    _watch_connection_status: Task<Option<()>>,
  69    disconnect_channel_buffers_task: Option<Task<()>>,
  70    _update_channels: Task<()>,
  71}
  72
  73#[derive(Clone, Debug)]
  74pub struct Channel {
  75    pub id: ChannelId,
  76    pub name: SharedString,
  77    pub visibility: proto::ChannelVisibility,
  78    pub parent_path: Vec<ChannelId>,
  79}
  80
  81#[derive(Default, Debug)]
  82pub struct ChannelState {
  83    latest_chat_message: Option<u64>,
  84    latest_notes_version: NotesVersion,
  85    observed_notes_version: NotesVersion,
  86    observed_chat_message: Option<u64>,
  87    role: Option<ChannelRole>,
  88    projects: HashSet<ProjectId>,
  89}
  90
  91impl Channel {
  92    pub fn link(&self, cx: &AppContext) -> String {
  93        format!(
  94            "{}/channel/{}-{}",
  95            ClientSettings::get_global(cx).server_url,
  96            Self::slug(&self.name),
  97            self.id
  98        )
  99    }
 100
 101    pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
 102        self.link(cx)
 103            + "/notes"
 104            + &heading
 105                .map(|h| format!("#{}", Self::slug(&h)))
 106                .unwrap_or_default()
 107    }
 108
 109    pub fn is_root_channel(&self) -> bool {
 110        self.parent_path.is_empty()
 111    }
 112
 113    pub fn root_id(&self) -> ChannelId {
 114        self.parent_path.first().copied().unwrap_or(self.id)
 115    }
 116
 117    pub fn slug(str: &str) -> String {
 118        let slug: String = str
 119            .chars()
 120            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 121            .collect();
 122
 123        slug.trim_matches(|c| c == '-').to_string()
 124    }
 125}
 126
 127#[derive(Debug)]
 128pub struct ChannelMembership {
 129    pub user: Arc<User>,
 130    pub kind: proto::channel_member::Kind,
 131    pub role: proto::ChannelRole,
 132}
 133impl ChannelMembership {
 134    pub fn sort_key(&self) -> MembershipSortKey {
 135        MembershipSortKey {
 136            role_order: match self.role {
 137                proto::ChannelRole::Admin => 0,
 138                proto::ChannelRole::Member => 1,
 139                proto::ChannelRole::Banned => 2,
 140                proto::ChannelRole::Talker => 3,
 141                proto::ChannelRole::Guest => 4,
 142            },
 143            kind_order: match self.kind {
 144                proto::channel_member::Kind::Member => 0,
 145                proto::channel_member::Kind::Invitee => 1,
 146            },
 147            username_order: self.user.github_login.as_str(),
 148        }
 149    }
 150}
 151
 152#[derive(PartialOrd, Ord, PartialEq, Eq)]
 153pub struct MembershipSortKey<'a> {
 154    role_order: u8,
 155    kind_order: u8,
 156    username_order: &'a str,
 157}
 158
 159pub enum ChannelEvent {
 160    ChannelCreated(ChannelId),
 161    ChannelRenamed(ChannelId),
 162}
 163
 164impl EventEmitter<ChannelEvent> for ChannelStore {}
 165
 166enum OpenedModelHandle<E> {
 167    Open(WeakModel<E>),
 168    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 169}
 170
 171struct GlobalChannelStore(Model<ChannelStore>);
 172
 173impl Global for GlobalChannelStore {}
 174
 175impl ChannelStore {
 176    pub fn global(cx: &AppContext) -> Model<Self> {
 177        cx.global::<GlobalChannelStore>().0.clone()
 178    }
 179
 180    pub fn new(
 181        client: Arc<Client>,
 182        user_store: Model<UserStore>,
 183        cx: &mut ModelContext<Self>,
 184    ) -> Self {
 185        let rpc_subscriptions = [
 186            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 187            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 188        ];
 189
 190        let mut connection_status = client.status();
 191        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 192        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 193            while let Some(status) = connection_status.next().await {
 194                let this = this.upgrade()?;
 195                match status {
 196                    client::Status::Connected { .. } => {
 197                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 198                            .ok()?
 199                            .await
 200                            .log_err()?;
 201                    }
 202                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 203                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 204                            .ok();
 205                    }
 206                    _ => {
 207                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 208                            .ok();
 209                    }
 210                }
 211            }
 212            Some(())
 213        });
 214
 215        Self {
 216            channel_invitations: Vec::default(),
 217            channel_index: ChannelIndex::default(),
 218            channel_participants: Default::default(),
 219            hosted_projects: Default::default(),
 220            outgoing_invites: Default::default(),
 221            opened_buffers: Default::default(),
 222            opened_chats: Default::default(),
 223            update_channels_tx,
 224            client,
 225            user_store,
 226            _rpc_subscriptions: rpc_subscriptions,
 227            _watch_connection_status: watch_connection_status,
 228            disconnect_channel_buffers_task: None,
 229            _update_channels: cx.spawn(|this, mut cx| async move {
 230                maybe!(async move {
 231                    while let Some(update_channels) = update_channels_rx.next().await {
 232                        if let Some(this) = this.upgrade() {
 233                            let update_task = this.update(&mut cx, |this, cx| {
 234                                this.update_channels(update_channels, cx)
 235                            })?;
 236                            if let Some(update_task) = update_task {
 237                                update_task.await.log_err();
 238                            }
 239                        }
 240                    }
 241                    anyhow::Ok(())
 242                })
 243                .await
 244                .log_err();
 245            }),
 246            channel_states: Default::default(),
 247            did_subscribe: false,
 248        }
 249    }
 250
 251    pub fn initialize(&mut self) {
 252        if !self.did_subscribe {
 253            if self
 254                .client
 255                .send(proto::SubscribeToChannels {})
 256                .log_err()
 257                .is_some()
 258            {
 259                self.did_subscribe = true;
 260            }
 261        }
 262    }
 263
 264    pub fn client(&self) -> Arc<Client> {
 265        self.client.clone()
 266    }
 267
 268    /// Returns the number of unique channels in the store
 269    pub fn channel_count(&self) -> usize {
 270        self.channel_index.by_id().len()
 271    }
 272
 273    /// Returns the index of a channel ID in the list of unique channels
 274    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 275        self.channel_index
 276            .by_id()
 277            .keys()
 278            .position(|id| *id == channel_id)
 279    }
 280
 281    /// Returns an iterator over all unique channels
 282    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 283        self.channel_index.by_id().values()
 284    }
 285
 286    /// Iterate over all entries in the channel DAG
 287    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 288        self.channel_index
 289            .ordered_channels()
 290            .iter()
 291            .filter_map(move |id| {
 292                let channel = self.channel_index.by_id().get(id)?;
 293                Some((channel.parent_path.len(), channel))
 294            })
 295    }
 296
 297    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 298        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 299        self.channel_index.by_id().get(channel_id)
 300    }
 301
 302    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 303        self.channel_index.by_id().values().nth(ix)
 304    }
 305
 306    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 307        self.channel_invitations
 308            .iter()
 309            .any(|channel| channel.id == channel_id)
 310    }
 311
 312    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 313        &self.channel_invitations
 314    }
 315
 316    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 317        self.channel_index.by_id().get(&channel_id)
 318    }
 319
 320    pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
 321        let mut projects: Vec<(SharedString, ProjectId)> = self
 322            .channel_states
 323            .get(&channel_id)
 324            .map(|state| state.projects.clone())
 325            .unwrap_or_default()
 326            .into_iter()
 327            .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
 328            .collect();
 329        projects.sort();
 330        projects
 331    }
 332
 333    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 334        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 335            if let OpenedModelHandle::Open(buffer) = buffer {
 336                return buffer.upgrade().is_some();
 337            }
 338        }
 339        false
 340    }
 341
 342    pub fn open_channel_buffer(
 343        &mut self,
 344        channel_id: ChannelId,
 345        cx: &mut ModelContext<Self>,
 346    ) -> Task<Result<Model<ChannelBuffer>>> {
 347        let client = self.client.clone();
 348        let user_store = self.user_store.clone();
 349        let channel_store = cx.handle();
 350        self.open_channel_resource(
 351            channel_id,
 352            |this| &mut this.opened_buffers,
 353            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 354            cx,
 355        )
 356    }
 357
 358    pub fn fetch_channel_messages(
 359        &self,
 360        message_ids: Vec<u64>,
 361        cx: &mut ModelContext<Self>,
 362    ) -> Task<Result<Vec<ChannelMessage>>> {
 363        let request = if message_ids.is_empty() {
 364            None
 365        } else {
 366            Some(
 367                self.client
 368                    .request(proto::GetChannelMessagesById { message_ids }),
 369            )
 370        };
 371        cx.spawn(|this, mut cx| async move {
 372            if let Some(request) = request {
 373                let response = request.await?;
 374                let this = this
 375                    .upgrade()
 376                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 377                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 378                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 379            } else {
 380                Ok(Vec::new())
 381            }
 382        })
 383    }
 384
 385    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 386        self.channel_states
 387            .get(&channel_id)
 388            .is_some_and(|state| state.has_channel_buffer_changed())
 389    }
 390
 391    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 392        self.channel_states
 393            .get(&channel_id)
 394            .is_some_and(|state| state.has_new_messages())
 395    }
 396
 397    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 398        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 399            state.latest_chat_message = message_id;
 400        }
 401    }
 402
 403    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 404        self.channel_states.get(&channel_id).and_then(|state| {
 405            if let Some(last_message_id) = state.latest_chat_message {
 406                if state
 407                    .last_acknowledged_message_id()
 408                    .is_some_and(|id| id < last_message_id)
 409                {
 410                    return state.last_acknowledged_message_id();
 411                }
 412            }
 413
 414            None
 415        })
 416    }
 417
 418    pub fn acknowledge_message_id(
 419        &mut self,
 420        channel_id: ChannelId,
 421        message_id: u64,
 422        cx: &mut ModelContext<Self>,
 423    ) {
 424        self.channel_states
 425            .entry(channel_id)
 426            .or_insert_with(|| Default::default())
 427            .acknowledge_message_id(message_id);
 428        cx.notify();
 429    }
 430
 431    pub fn update_latest_message_id(
 432        &mut self,
 433        channel_id: ChannelId,
 434        message_id: u64,
 435        cx: &mut ModelContext<Self>,
 436    ) {
 437        self.channel_states
 438            .entry(channel_id)
 439            .or_insert_with(|| Default::default())
 440            .update_latest_message_id(message_id);
 441        cx.notify();
 442    }
 443
 444    pub fn acknowledge_notes_version(
 445        &mut self,
 446        channel_id: ChannelId,
 447        epoch: u64,
 448        version: &clock::Global,
 449        cx: &mut ModelContext<Self>,
 450    ) {
 451        self.channel_states
 452            .entry(channel_id)
 453            .or_insert_with(|| Default::default())
 454            .acknowledge_notes_version(epoch, version);
 455        cx.notify()
 456    }
 457
 458    pub fn update_latest_notes_version(
 459        &mut self,
 460        channel_id: ChannelId,
 461        epoch: u64,
 462        version: &clock::Global,
 463        cx: &mut ModelContext<Self>,
 464    ) {
 465        self.channel_states
 466            .entry(channel_id)
 467            .or_insert_with(|| Default::default())
 468            .update_latest_notes_version(epoch, version);
 469        cx.notify()
 470    }
 471
 472    pub fn open_channel_chat(
 473        &mut self,
 474        channel_id: ChannelId,
 475        cx: &mut ModelContext<Self>,
 476    ) -> Task<Result<Model<ChannelChat>>> {
 477        let client = self.client.clone();
 478        let user_store = self.user_store.clone();
 479        let this = cx.handle();
 480        self.open_channel_resource(
 481            channel_id,
 482            |this| &mut this.opened_chats,
 483            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 484            cx,
 485        )
 486    }
 487
 488    /// Asynchronously open a given resource associated with a channel.
 489    ///
 490    /// Make sure that the resource is only opened once, even if this method
 491    /// is called multiple times with the same channel id while the first task
 492    /// is still running.
 493    fn open_channel_resource<T, F, Fut>(
 494        &mut self,
 495        channel_id: ChannelId,
 496        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 497        load: F,
 498        cx: &mut ModelContext<Self>,
 499    ) -> Task<Result<Model<T>>>
 500    where
 501        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 502        Fut: Future<Output = Result<Model<T>>>,
 503        T: 'static,
 504    {
 505        let task = loop {
 506            match get_map(self).entry(channel_id) {
 507                hash_map::Entry::Occupied(e) => match e.get() {
 508                    OpenedModelHandle::Open(model) => {
 509                        if let Some(model) = model.upgrade() {
 510                            break Task::ready(Ok(model)).shared();
 511                        } else {
 512                            get_map(self).remove(&channel_id);
 513                            continue;
 514                        }
 515                    }
 516                    OpenedModelHandle::Loading(task) => {
 517                        break task.clone();
 518                    }
 519                },
 520                hash_map::Entry::Vacant(e) => {
 521                    let task = cx
 522                        .spawn(move |this, mut cx| async move {
 523                            let channel = this.update(&mut cx, |this, _| {
 524                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 525                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 526                                })
 527                            })??;
 528
 529                            load(channel, cx).await.map_err(Arc::new)
 530                        })
 531                        .shared();
 532
 533                    e.insert(OpenedModelHandle::Loading(task.clone()));
 534                    cx.spawn({
 535                        let task = task.clone();
 536                        move |this, mut cx| async move {
 537                            let result = task.await;
 538                            this.update(&mut cx, |this, _| match result {
 539                                Ok(model) => {
 540                                    get_map(this).insert(
 541                                        channel_id,
 542                                        OpenedModelHandle::Open(model.downgrade()),
 543                                    );
 544                                }
 545                                Err(_) => {
 546                                    get_map(this).remove(&channel_id);
 547                                }
 548                            })
 549                            .ok();
 550                        }
 551                    })
 552                    .detach();
 553                    break task;
 554                }
 555            }
 556        };
 557        cx.background_executor()
 558            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 559    }
 560
 561    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 562        self.channel_role(channel_id) == proto::ChannelRole::Admin
 563    }
 564
 565    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 566        self.channel_index
 567            .by_id()
 568            .get(&channel_id)
 569            .map_or(false, |channel| channel.is_root_channel())
 570    }
 571
 572    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 573        self.channel_index
 574            .by_id()
 575            .get(&channel_id)
 576            .map_or(false, |channel| {
 577                channel.visibility == ChannelVisibility::Public
 578            })
 579    }
 580
 581    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 582        match self.channel_role(channel_id) {
 583            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 584            _ => Capability::ReadOnly,
 585        }
 586    }
 587
 588    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 589        maybe!({
 590            let mut channel = self.channel_for_id(channel_id)?;
 591            if !channel.is_root_channel() {
 592                channel = self.channel_for_id(channel.root_id())?;
 593            }
 594            let root_channel_state = self.channel_states.get(&channel.id);
 595            root_channel_state?.role
 596        })
 597        .unwrap_or(proto::ChannelRole::Guest)
 598    }
 599
 600    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 601        self.channel_participants
 602            .get(&channel_id)
 603            .map_or(&[], |v| v.as_slice())
 604    }
 605
 606    pub fn create_channel(
 607        &self,
 608        name: &str,
 609        parent_id: Option<ChannelId>,
 610        cx: &mut ModelContext<Self>,
 611    ) -> Task<Result<ChannelId>> {
 612        let client = self.client.clone();
 613        let name = name.trim_start_matches('#').to_owned();
 614        cx.spawn(move |this, mut cx| async move {
 615            let response = client
 616                .request(proto::CreateChannel {
 617                    name,
 618                    parent_id: parent_id.map(|cid| cid.0),
 619                })
 620                .await?;
 621
 622            let channel = response
 623                .channel
 624                .ok_or_else(|| anyhow!("missing channel in response"))?;
 625            let channel_id = ChannelId(channel.id);
 626
 627            this.update(&mut cx, |this, cx| {
 628                let task = this.update_channels(
 629                    proto::UpdateChannels {
 630                        channels: vec![channel],
 631                        ..Default::default()
 632                    },
 633                    cx,
 634                );
 635                assert!(task.is_none());
 636
 637                // This event is emitted because the collab panel wants to clear the pending edit state
 638                // before this frame is rendered. But we can't guarantee that the collab panel's future
 639                // will resolve before this flush_effects finishes. Synchronously emitting this event
 640                // ensures that the collab panel will observe this creation before the frame completes
 641                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 642            })?;
 643
 644            Ok(channel_id)
 645        })
 646    }
 647
 648    pub fn move_channel(
 649        &mut self,
 650        channel_id: ChannelId,
 651        to: ChannelId,
 652        cx: &mut ModelContext<Self>,
 653    ) -> Task<Result<()>> {
 654        let client = self.client.clone();
 655        cx.spawn(move |_, _| async move {
 656            let _ = client
 657                .request(proto::MoveChannel {
 658                    channel_id: channel_id.0,
 659                    to: to.0,
 660                })
 661                .await?;
 662
 663            Ok(())
 664        })
 665    }
 666
 667    pub fn set_channel_visibility(
 668        &mut self,
 669        channel_id: ChannelId,
 670        visibility: ChannelVisibility,
 671        cx: &mut ModelContext<Self>,
 672    ) -> Task<Result<()>> {
 673        let client = self.client.clone();
 674        cx.spawn(move |_, _| async move {
 675            let _ = client
 676                .request(proto::SetChannelVisibility {
 677                    channel_id: channel_id.0,
 678                    visibility: visibility.into(),
 679                })
 680                .await?;
 681
 682            Ok(())
 683        })
 684    }
 685
 686    pub fn invite_member(
 687        &mut self,
 688        channel_id: ChannelId,
 689        user_id: UserId,
 690        role: proto::ChannelRole,
 691        cx: &mut ModelContext<Self>,
 692    ) -> Task<Result<()>> {
 693        if !self.outgoing_invites.insert((channel_id, user_id)) {
 694            return Task::ready(Err(anyhow!("invite request already in progress")));
 695        }
 696
 697        cx.notify();
 698        let client = self.client.clone();
 699        cx.spawn(move |this, mut cx| async move {
 700            let result = client
 701                .request(proto::InviteChannelMember {
 702                    channel_id: channel_id.0,
 703                    user_id,
 704                    role: role.into(),
 705                })
 706                .await;
 707
 708            this.update(&mut cx, |this, cx| {
 709                this.outgoing_invites.remove(&(channel_id, user_id));
 710                cx.notify();
 711            })?;
 712
 713            result?;
 714
 715            Ok(())
 716        })
 717    }
 718
 719    pub fn remove_member(
 720        &mut self,
 721        channel_id: ChannelId,
 722        user_id: u64,
 723        cx: &mut ModelContext<Self>,
 724    ) -> Task<Result<()>> {
 725        if !self.outgoing_invites.insert((channel_id, user_id)) {
 726            return Task::ready(Err(anyhow!("invite request already in progress")));
 727        }
 728
 729        cx.notify();
 730        let client = self.client.clone();
 731        cx.spawn(move |this, mut cx| async move {
 732            let result = client
 733                .request(proto::RemoveChannelMember {
 734                    channel_id: channel_id.0,
 735                    user_id,
 736                })
 737                .await;
 738
 739            this.update(&mut cx, |this, cx| {
 740                this.outgoing_invites.remove(&(channel_id, user_id));
 741                cx.notify();
 742            })?;
 743            result?;
 744            Ok(())
 745        })
 746    }
 747
 748    pub fn set_member_role(
 749        &mut self,
 750        channel_id: ChannelId,
 751        user_id: UserId,
 752        role: proto::ChannelRole,
 753        cx: &mut ModelContext<Self>,
 754    ) -> Task<Result<()>> {
 755        if !self.outgoing_invites.insert((channel_id, user_id)) {
 756            return Task::ready(Err(anyhow!("member request already in progress")));
 757        }
 758
 759        cx.notify();
 760        let client = self.client.clone();
 761        cx.spawn(move |this, mut cx| async move {
 762            let result = client
 763                .request(proto::SetChannelMemberRole {
 764                    channel_id: channel_id.0,
 765                    user_id,
 766                    role: role.into(),
 767                })
 768                .await;
 769
 770            this.update(&mut cx, |this, cx| {
 771                this.outgoing_invites.remove(&(channel_id, user_id));
 772                cx.notify();
 773            })?;
 774
 775            result?;
 776            Ok(())
 777        })
 778    }
 779
 780    pub fn rename(
 781        &mut self,
 782        channel_id: ChannelId,
 783        new_name: &str,
 784        cx: &mut ModelContext<Self>,
 785    ) -> Task<Result<()>> {
 786        let client = self.client.clone();
 787        let name = new_name.to_string();
 788        cx.spawn(move |this, mut cx| async move {
 789            let channel = client
 790                .request(proto::RenameChannel {
 791                    channel_id: channel_id.0,
 792                    name,
 793                })
 794                .await?
 795                .channel
 796                .ok_or_else(|| anyhow!("missing channel in response"))?;
 797            this.update(&mut cx, |this, cx| {
 798                let task = this.update_channels(
 799                    proto::UpdateChannels {
 800                        channels: vec![channel],
 801                        ..Default::default()
 802                    },
 803                    cx,
 804                );
 805                assert!(task.is_none());
 806
 807                // This event is emitted because the collab panel wants to clear the pending edit state
 808                // before this frame is rendered. But we can't guarantee that the collab panel's future
 809                // will resolve before this flush_effects finishes. Synchronously emitting this event
 810                // ensures that the collab panel will observe this creation before the frame complete
 811                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 812            })?;
 813            Ok(())
 814        })
 815    }
 816
 817    pub fn respond_to_channel_invite(
 818        &mut self,
 819        channel_id: ChannelId,
 820        accept: bool,
 821        cx: &mut ModelContext<Self>,
 822    ) -> Task<Result<()>> {
 823        let client = self.client.clone();
 824        cx.background_executor().spawn(async move {
 825            client
 826                .request(proto::RespondToChannelInvite {
 827                    channel_id: channel_id.0,
 828                    accept,
 829                })
 830                .await?;
 831            Ok(())
 832        })
 833    }
 834    pub fn fuzzy_search_members(
 835        &self,
 836        channel_id: ChannelId,
 837        query: String,
 838        limit: u16,
 839        cx: &mut ModelContext<Self>,
 840    ) -> Task<Result<Vec<ChannelMembership>>> {
 841        let client = self.client.clone();
 842        let user_store = self.user_store.downgrade();
 843        cx.spawn(move |_, mut cx| async move {
 844            let response = client
 845                .request(proto::GetChannelMembers {
 846                    channel_id: channel_id.0,
 847                    query,
 848                    limit: limit as u64,
 849                })
 850                .await?;
 851            user_store.update(&mut cx, |user_store, _| {
 852                user_store.insert(response.users);
 853                response
 854                    .members
 855                    .into_iter()
 856                    .filter_map(|member| {
 857                        Some(ChannelMembership {
 858                            user: user_store.get_cached_user(member.user_id)?,
 859                            role: member.role(),
 860                            kind: member.kind(),
 861                        })
 862                    })
 863                    .collect()
 864            })
 865        })
 866    }
 867
 868    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 869        let client = self.client.clone();
 870        async move {
 871            client
 872                .request(proto::DeleteChannel {
 873                    channel_id: channel_id.0,
 874                })
 875                .await?;
 876            Ok(())
 877        }
 878    }
 879
 880    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 881        false
 882    }
 883
 884    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 885        self.outgoing_invites.contains(&(channel_id, user_id))
 886    }
 887
 888    async fn handle_update_channels(
 889        this: Model<Self>,
 890        message: TypedEnvelope<proto::UpdateChannels>,
 891        mut cx: AsyncAppContext,
 892    ) -> Result<()> {
 893        this.update(&mut cx, |this, _| {
 894            this.update_channels_tx
 895                .unbounded_send(message.payload)
 896                .unwrap();
 897        })?;
 898        Ok(())
 899    }
 900
 901    async fn handle_update_user_channels(
 902        this: Model<Self>,
 903        message: TypedEnvelope<proto::UpdateUserChannels>,
 904        mut cx: AsyncAppContext,
 905    ) -> Result<()> {
 906        this.update(&mut cx, |this, cx| {
 907            for buffer_version in message.payload.observed_channel_buffer_version {
 908                let version = language::proto::deserialize_version(&buffer_version.version);
 909                this.acknowledge_notes_version(
 910                    ChannelId(buffer_version.channel_id),
 911                    buffer_version.epoch,
 912                    &version,
 913                    cx,
 914                );
 915            }
 916            for message_id in message.payload.observed_channel_message_id {
 917                this.acknowledge_message_id(
 918                    ChannelId(message_id.channel_id),
 919                    message_id.message_id,
 920                    cx,
 921                );
 922            }
 923            for membership in message.payload.channel_memberships {
 924                if let Some(role) = ChannelRole::from_i32(membership.role) {
 925                    this.channel_states
 926                        .entry(ChannelId(membership.channel_id))
 927                        .or_insert_with(|| ChannelState::default())
 928                        .set_role(role)
 929                }
 930            }
 931        })
 932    }
 933
 934    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 935        self.channel_index.clear();
 936        self.channel_invitations.clear();
 937        self.channel_participants.clear();
 938        self.channel_index.clear();
 939        self.outgoing_invites.clear();
 940        self.disconnect_channel_buffers_task.take();
 941
 942        for chat in self.opened_chats.values() {
 943            if let OpenedModelHandle::Open(chat) = chat {
 944                if let Some(chat) = chat.upgrade() {
 945                    chat.update(cx, |chat, cx| {
 946                        chat.rejoin(cx);
 947                    });
 948                }
 949            }
 950        }
 951
 952        let mut buffer_versions = Vec::new();
 953        for buffer in self.opened_buffers.values() {
 954            if let OpenedModelHandle::Open(buffer) = buffer {
 955                if let Some(buffer) = buffer.upgrade() {
 956                    let channel_buffer = buffer.read(cx);
 957                    let buffer = channel_buffer.buffer().read(cx);
 958                    buffer_versions.push(proto::ChannelBufferVersion {
 959                        channel_id: channel_buffer.channel_id.0,
 960                        epoch: channel_buffer.epoch(),
 961                        version: language::proto::serialize_version(&buffer.version()),
 962                    });
 963                }
 964            }
 965        }
 966
 967        if buffer_versions.is_empty() {
 968            return Task::ready(Ok(()));
 969        }
 970
 971        let response = self.client.request(proto::RejoinChannelBuffers {
 972            buffers: buffer_versions,
 973        });
 974
 975        cx.spawn(|this, mut cx| async move {
 976            let mut response = response.await?;
 977
 978            this.update(&mut cx, |this, cx| {
 979                this.opened_buffers.retain(|_, buffer| match buffer {
 980                    OpenedModelHandle::Open(channel_buffer) => {
 981                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 982                            return false;
 983                        };
 984
 985                        channel_buffer.update(cx, |channel_buffer, cx| {
 986                            let channel_id = channel_buffer.channel_id;
 987                            if let Some(remote_buffer) = response
 988                                .buffers
 989                                .iter_mut()
 990                                .find(|buffer| buffer.channel_id == channel_id.0)
 991                            {
 992                                let channel_id = channel_buffer.channel_id;
 993                                let remote_version =
 994                                    language::proto::deserialize_version(&remote_buffer.version);
 995
 996                                channel_buffer.replace_collaborators(
 997                                    mem::take(&mut remote_buffer.collaborators),
 998                                    cx,
 999                                );
1000
1001                                let operations = channel_buffer
1002                                    .buffer()
1003                                    .update(cx, |buffer, cx| {
1004                                        let outgoing_operations =
1005                                            buffer.serialize_ops(Some(remote_version), cx);
1006                                        let incoming_operations =
1007                                            mem::take(&mut remote_buffer.operations)
1008                                                .into_iter()
1009                                                .map(language::proto::deserialize_operation)
1010                                                .collect::<Result<Vec<_>>>()?;
1011                                        buffer.apply_ops(incoming_operations, cx)?;
1012                                        anyhow::Ok(outgoing_operations)
1013                                    })
1014                                    .log_err();
1015
1016                                if let Some(operations) = operations {
1017                                    let client = this.client.clone();
1018                                    cx.background_executor()
1019                                        .spawn(async move {
1020                                            let operations = operations.await;
1021                                            for chunk in
1022                                                language::proto::split_operations(operations)
1023                                            {
1024                                                client
1025                                                    .send(proto::UpdateChannelBuffer {
1026                                                        channel_id: channel_id.0,
1027                                                        operations: chunk,
1028                                                    })
1029                                                    .ok();
1030                                            }
1031                                        })
1032                                        .detach();
1033                                    return true;
1034                                }
1035                            }
1036
1037                            channel_buffer.disconnect(cx);
1038                            false
1039                        })
1040                    }
1041                    OpenedModelHandle::Loading(_) => true,
1042                });
1043            })
1044            .ok();
1045            anyhow::Ok(())
1046        })
1047    }
1048
1049    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1050        cx.notify();
1051        self.did_subscribe = false;
1052        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1053            cx.spawn(move |this, mut cx| async move {
1054                if wait_for_reconnect {
1055                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1056                }
1057
1058                if let Some(this) = this.upgrade() {
1059                    this.update(&mut cx, |this, cx| {
1060                        for (_, buffer) in this.opened_buffers.drain() {
1061                            if let OpenedModelHandle::Open(buffer) = buffer {
1062                                if let Some(buffer) = buffer.upgrade() {
1063                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1064                                }
1065                            }
1066                        }
1067                    })
1068                    .ok();
1069                }
1070            })
1071        });
1072    }
1073
1074    pub(crate) fn update_channels(
1075        &mut self,
1076        payload: proto::UpdateChannels,
1077        cx: &mut ModelContext<ChannelStore>,
1078    ) -> Option<Task<Result<()>>> {
1079        if !payload.remove_channel_invitations.is_empty() {
1080            self.channel_invitations
1081                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1082        }
1083        for channel in payload.channel_invitations {
1084            match self
1085                .channel_invitations
1086                .binary_search_by_key(&channel.id, |c| c.id.0)
1087            {
1088                Ok(ix) => {
1089                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1090                }
1091                Err(ix) => self.channel_invitations.insert(
1092                    ix,
1093                    Arc::new(Channel {
1094                        id: ChannelId(channel.id),
1095                        visibility: channel.visibility(),
1096                        name: channel.name.into(),
1097                        parent_path: channel
1098                            .parent_path
1099                            .into_iter()
1100                            .map(|cid| ChannelId(cid))
1101                            .collect(),
1102                    }),
1103                ),
1104            }
1105        }
1106
1107        let channels_changed = !payload.channels.is_empty()
1108            || !payload.delete_channels.is_empty()
1109            || !payload.latest_channel_message_ids.is_empty()
1110            || !payload.latest_channel_buffer_versions.is_empty()
1111            || !payload.hosted_projects.is_empty()
1112            || !payload.deleted_hosted_projects.is_empty();
1113
1114        if channels_changed {
1115            if !payload.delete_channels.is_empty() {
1116                let delete_channels: Vec<ChannelId> = payload
1117                    .delete_channels
1118                    .into_iter()
1119                    .map(|cid| ChannelId(cid))
1120                    .collect();
1121                self.channel_index.delete_channels(&delete_channels);
1122                self.channel_participants
1123                    .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1124
1125                for channel_id in &delete_channels {
1126                    let channel_id = *channel_id;
1127                    if payload
1128                        .channels
1129                        .iter()
1130                        .any(|channel| channel.id == channel_id.0)
1131                    {
1132                        continue;
1133                    }
1134                    if let Some(OpenedModelHandle::Open(buffer)) =
1135                        self.opened_buffers.remove(&channel_id)
1136                    {
1137                        if let Some(buffer) = buffer.upgrade() {
1138                            buffer.update(cx, ChannelBuffer::disconnect);
1139                        }
1140                    }
1141                }
1142            }
1143
1144            let mut index = self.channel_index.bulk_insert();
1145            for channel in payload.channels {
1146                let id = ChannelId(channel.id);
1147                let channel_changed = index.insert(channel);
1148
1149                if channel_changed {
1150                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1151                        if let Some(buffer) = buffer.upgrade() {
1152                            buffer.update(cx, ChannelBuffer::channel_changed);
1153                        }
1154                    }
1155                }
1156            }
1157
1158            for latest_buffer_version in payload.latest_channel_buffer_versions {
1159                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1160                self.channel_states
1161                    .entry(ChannelId(latest_buffer_version.channel_id))
1162                    .or_default()
1163                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1164            }
1165
1166            for latest_channel_message in payload.latest_channel_message_ids {
1167                self.channel_states
1168                    .entry(ChannelId(latest_channel_message.channel_id))
1169                    .or_default()
1170                    .update_latest_message_id(latest_channel_message.message_id);
1171            }
1172
1173            for hosted_project in payload.hosted_projects {
1174                let hosted_project: HostedProject = hosted_project.into();
1175                if let Some(old_project) = self
1176                    .hosted_projects
1177                    .insert(hosted_project.project_id, hosted_project.clone())
1178                {
1179                    self.channel_states
1180                        .entry(old_project.channel_id)
1181                        .or_default()
1182                        .remove_hosted_project(old_project.project_id);
1183                }
1184                self.channel_states
1185                    .entry(hosted_project.channel_id)
1186                    .or_default()
1187                    .add_hosted_project(hosted_project.project_id);
1188            }
1189
1190            for hosted_project_id in payload.deleted_hosted_projects {
1191                let hosted_project_id = ProjectId(hosted_project_id);
1192
1193                if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1194                    self.channel_states
1195                        .entry(old_project.channel_id)
1196                        .or_default()
1197                        .remove_hosted_project(old_project.project_id);
1198                }
1199            }
1200        }
1201
1202        cx.notify();
1203        if payload.channel_participants.is_empty() {
1204            return None;
1205        }
1206
1207        let mut all_user_ids = Vec::new();
1208        let channel_participants = payload.channel_participants;
1209        for entry in &channel_participants {
1210            for user_id in entry.participant_user_ids.iter() {
1211                if let Err(ix) = all_user_ids.binary_search(user_id) {
1212                    all_user_ids.insert(ix, *user_id);
1213                }
1214            }
1215        }
1216
1217        let users = self
1218            .user_store
1219            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1220        Some(cx.spawn(|this, mut cx| async move {
1221            let users = users.await?;
1222
1223            this.update(&mut cx, |this, cx| {
1224                for entry in &channel_participants {
1225                    let mut participants: Vec<_> = entry
1226                        .participant_user_ids
1227                        .iter()
1228                        .filter_map(|user_id| {
1229                            users
1230                                .binary_search_by_key(&user_id, |user| &user.id)
1231                                .ok()
1232                                .map(|ix| users[ix].clone())
1233                        })
1234                        .collect();
1235
1236                    participants.sort_by_key(|u| u.id);
1237
1238                    this.channel_participants
1239                        .insert(ChannelId(entry.channel_id), participants);
1240                }
1241
1242                cx.notify();
1243            })
1244        }))
1245    }
1246}
1247
1248impl ChannelState {
1249    fn set_role(&mut self, role: ChannelRole) {
1250        self.role = Some(role);
1251    }
1252
1253    fn has_channel_buffer_changed(&self) -> bool {
1254        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1255            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1256                && self
1257                    .latest_notes_version
1258                    .version
1259                    .changed_since(&self.observed_notes_version.version))
1260    }
1261
1262    fn has_new_messages(&self) -> bool {
1263        let latest_message_id = self.latest_chat_message;
1264        let observed_message_id = self.observed_chat_message;
1265
1266        latest_message_id.is_some_and(|latest_message_id| {
1267            latest_message_id > observed_message_id.unwrap_or_default()
1268        })
1269    }
1270
1271    fn last_acknowledged_message_id(&self) -> Option<u64> {
1272        self.observed_chat_message
1273    }
1274
1275    fn acknowledge_message_id(&mut self, message_id: u64) {
1276        let observed = self.observed_chat_message.get_or_insert(message_id);
1277        *observed = (*observed).max(message_id);
1278    }
1279
1280    fn update_latest_message_id(&mut self, message_id: u64) {
1281        self.latest_chat_message =
1282            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1283    }
1284
1285    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1286        if self.observed_notes_version.epoch == epoch {
1287            self.observed_notes_version.version.join(version);
1288        } else {
1289            self.observed_notes_version = NotesVersion {
1290                epoch,
1291                version: version.clone(),
1292            };
1293        }
1294    }
1295
1296    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1297        if self.latest_notes_version.epoch == epoch {
1298            self.latest_notes_version.version.join(version);
1299        } else {
1300            self.latest_notes_version = NotesVersion {
1301                epoch,
1302                version: version.clone(),
1303            };
1304        }
1305    }
1306
1307    fn add_hosted_project(&mut self, project_id: ProjectId) {
1308        self.projects.insert(project_id);
1309    }
1310
1311    fn remove_hosted_project(&mut self, project_id: ProjectId) {
1312        self.projects.remove(&project_id);
1313    }
1314}