channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use rpc::{
  15    proto::{self, ChannelRole, ChannelVisibility},
  16    TypedEnvelope,
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{maybe, ResultExt};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  25    let channel_store =
  26        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36#[derive(Debug, Clone)]
  37pub struct HostedProject {
  38    project_id: ProjectId,
  39    channel_id: ChannelId,
  40    name: SharedString,
  41    _visibility: proto::ChannelVisibility,
  42}
  43
  44impl From<proto::HostedProject> for HostedProject {
  45    fn from(project: proto::HostedProject) -> Self {
  46        Self {
  47            project_id: ProjectId(project.project_id),
  48            channel_id: ChannelId(project.channel_id),
  49            _visibility: project.visibility(),
  50            name: project.name.into(),
  51        }
  52    }
  53}
  54
  55pub struct ChannelStore {
  56    pub channel_index: ChannelIndex,
  57    channel_invitations: Vec<Arc<Channel>>,
  58    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  59    channel_states: HashMap<ChannelId, ChannelState>,
  60    hosted_projects: HashMap<ProjectId, HostedProject>,
  61
  62    outgoing_invites: HashSet<(ChannelId, UserId)>,
  63    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  64    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  65    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  66    client: Arc<Client>,
  67    user_store: Model<UserStore>,
  68    _rpc_subscriptions: [Subscription; 2],
  69    _watch_connection_status: Task<Option<()>>,
  70    disconnect_channel_buffers_task: Option<Task<()>>,
  71    _update_channels: Task<()>,
  72}
  73
  74#[derive(Clone, Debug)]
  75pub struct Channel {
  76    pub id: ChannelId,
  77    pub name: SharedString,
  78    pub visibility: proto::ChannelVisibility,
  79    pub parent_path: Vec<ChannelId>,
  80}
  81
  82#[derive(Default, Debug)]
  83pub struct ChannelState {
  84    latest_chat_message: Option<u64>,
  85    latest_notes_version: NotesVersion,
  86    observed_notes_version: NotesVersion,
  87    observed_chat_message: Option<u64>,
  88    role: Option<ChannelRole>,
  89    projects: HashSet<ProjectId>,
  90}
  91
  92impl Channel {
  93    pub fn link(&self, cx: &AppContext) -> String {
  94        format!(
  95            "{}/channel/{}-{}",
  96            ClientSettings::get_global(cx).server_url,
  97            Self::slug(&self.name),
  98            self.id
  99        )
 100    }
 101
 102    pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
 103        self.link(cx)
 104            + "/notes"
 105            + &heading
 106                .map(|h| format!("#{}", Self::slug(&h)))
 107                .unwrap_or_default()
 108    }
 109
 110    pub fn is_root_channel(&self) -> bool {
 111        self.parent_path.is_empty()
 112    }
 113
 114    pub fn root_id(&self) -> ChannelId {
 115        self.parent_path.first().copied().unwrap_or(self.id)
 116    }
 117
 118    pub fn slug(str: &str) -> String {
 119        let slug: String = str
 120            .chars()
 121            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 122            .collect();
 123
 124        slug.trim_matches(|c| c == '-').to_string()
 125    }
 126}
 127
 128pub struct ChannelMembership {
 129    pub user: Arc<User>,
 130    pub kind: proto::channel_member::Kind,
 131    pub role: proto::ChannelRole,
 132}
 133impl ChannelMembership {
 134    pub fn sort_key(&self) -> MembershipSortKey {
 135        MembershipSortKey {
 136            role_order: match self.role {
 137                proto::ChannelRole::Admin => 0,
 138                proto::ChannelRole::Member => 1,
 139                proto::ChannelRole::Banned => 2,
 140                proto::ChannelRole::Talker => 3,
 141                proto::ChannelRole::Guest => 4,
 142            },
 143            kind_order: match self.kind {
 144                proto::channel_member::Kind::Member => 0,
 145                proto::channel_member::Kind::Invitee => 1,
 146            },
 147            username_order: self.user.github_login.as_str(),
 148        }
 149    }
 150}
 151
 152#[derive(PartialOrd, Ord, PartialEq, Eq)]
 153pub struct MembershipSortKey<'a> {
 154    role_order: u8,
 155    kind_order: u8,
 156    username_order: &'a str,
 157}
 158
 159pub enum ChannelEvent {
 160    ChannelCreated(ChannelId),
 161    ChannelRenamed(ChannelId),
 162}
 163
 164impl EventEmitter<ChannelEvent> for ChannelStore {}
 165
 166enum OpenedModelHandle<E> {
 167    Open(WeakModel<E>),
 168    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 169}
 170
 171struct GlobalChannelStore(Model<ChannelStore>);
 172
 173impl Global for GlobalChannelStore {}
 174
 175impl ChannelStore {
 176    pub fn global(cx: &AppContext) -> Model<Self> {
 177        cx.global::<GlobalChannelStore>().0.clone()
 178    }
 179
 180    pub fn new(
 181        client: Arc<Client>,
 182        user_store: Model<UserStore>,
 183        cx: &mut ModelContext<Self>,
 184    ) -> Self {
 185        let rpc_subscriptions = [
 186            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 187            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 188        ];
 189
 190        let mut connection_status = client.status();
 191        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 192        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 193            while let Some(status) = connection_status.next().await {
 194                let this = this.upgrade()?;
 195                match status {
 196                    client::Status::Connected { .. } => {
 197                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 198                            .ok()?
 199                            .await
 200                            .log_err()?;
 201                    }
 202                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 203                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 204                            .ok();
 205                    }
 206                    _ => {
 207                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 208                            .ok();
 209                    }
 210                }
 211            }
 212            Some(())
 213        });
 214
 215        Self {
 216            channel_invitations: Vec::default(),
 217            channel_index: ChannelIndex::default(),
 218            channel_participants: Default::default(),
 219            hosted_projects: Default::default(),
 220            outgoing_invites: Default::default(),
 221            opened_buffers: Default::default(),
 222            opened_chats: Default::default(),
 223            update_channels_tx,
 224            client,
 225            user_store,
 226            _rpc_subscriptions: rpc_subscriptions,
 227            _watch_connection_status: watch_connection_status,
 228            disconnect_channel_buffers_task: None,
 229            _update_channels: cx.spawn(|this, mut cx| async move {
 230                maybe!(async move {
 231                    while let Some(update_channels) = update_channels_rx.next().await {
 232                        if let Some(this) = this.upgrade() {
 233                            let update_task = this.update(&mut cx, |this, cx| {
 234                                this.update_channels(update_channels, cx)
 235                            })?;
 236                            if let Some(update_task) = update_task {
 237                                update_task.await.log_err();
 238                            }
 239                        }
 240                    }
 241                    anyhow::Ok(())
 242                })
 243                .await
 244                .log_err();
 245            }),
 246            channel_states: Default::default(),
 247        }
 248    }
 249
 250    pub fn client(&self) -> Arc<Client> {
 251        self.client.clone()
 252    }
 253
 254    /// Returns the number of unique channels in the store
 255    pub fn channel_count(&self) -> usize {
 256        self.channel_index.by_id().len()
 257    }
 258
 259    /// Returns the index of a channel ID in the list of unique channels
 260    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 261        self.channel_index
 262            .by_id()
 263            .keys()
 264            .position(|id| *id == channel_id)
 265    }
 266
 267    /// Returns an iterator over all unique channels
 268    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 269        self.channel_index.by_id().values()
 270    }
 271
 272    /// Iterate over all entries in the channel DAG
 273    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 274        self.channel_index
 275            .ordered_channels()
 276            .iter()
 277            .filter_map(move |id| {
 278                let channel = self.channel_index.by_id().get(id)?;
 279                Some((channel.parent_path.len(), channel))
 280            })
 281    }
 282
 283    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 284        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 285        self.channel_index.by_id().get(channel_id)
 286    }
 287
 288    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 289        self.channel_index.by_id().values().nth(ix)
 290    }
 291
 292    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 293        self.channel_invitations
 294            .iter()
 295            .any(|channel| channel.id == channel_id)
 296    }
 297
 298    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 299        &self.channel_invitations
 300    }
 301
 302    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 303        self.channel_index.by_id().get(&channel_id)
 304    }
 305
 306    pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
 307        let mut projects: Vec<(SharedString, ProjectId)> = self
 308            .channel_states
 309            .get(&channel_id)
 310            .map(|state| state.projects.clone())
 311            .unwrap_or_default()
 312            .into_iter()
 313            .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
 314            .collect();
 315        projects.sort();
 316        projects
 317    }
 318
 319    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 320        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 321            if let OpenedModelHandle::Open(buffer) = buffer {
 322                return buffer.upgrade().is_some();
 323            }
 324        }
 325        false
 326    }
 327
 328    pub fn open_channel_buffer(
 329        &mut self,
 330        channel_id: ChannelId,
 331        cx: &mut ModelContext<Self>,
 332    ) -> Task<Result<Model<ChannelBuffer>>> {
 333        let client = self.client.clone();
 334        let user_store = self.user_store.clone();
 335        let channel_store = cx.handle();
 336        self.open_channel_resource(
 337            channel_id,
 338            |this| &mut this.opened_buffers,
 339            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 340            cx,
 341        )
 342    }
 343
 344    pub fn fetch_channel_messages(
 345        &self,
 346        message_ids: Vec<u64>,
 347        cx: &mut ModelContext<Self>,
 348    ) -> Task<Result<Vec<ChannelMessage>>> {
 349        let request = if message_ids.is_empty() {
 350            None
 351        } else {
 352            Some(
 353                self.client
 354                    .request(proto::GetChannelMessagesById { message_ids }),
 355            )
 356        };
 357        cx.spawn(|this, mut cx| async move {
 358            if let Some(request) = request {
 359                let response = request.await?;
 360                let this = this
 361                    .upgrade()
 362                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 363                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 364                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 365            } else {
 366                Ok(Vec::new())
 367            }
 368        })
 369    }
 370
 371    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 372        self.channel_states
 373            .get(&channel_id)
 374            .is_some_and(|state| state.has_channel_buffer_changed())
 375    }
 376
 377    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 378        self.channel_states
 379            .get(&channel_id)
 380            .is_some_and(|state| state.has_new_messages())
 381    }
 382
 383    pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
 384        if let Some(state) = self.channel_states.get_mut(&channel_id) {
 385            state.latest_chat_message = message_id;
 386        }
 387    }
 388
 389    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 390        self.channel_states.get(&channel_id).and_then(|state| {
 391            if let Some(last_message_id) = state.latest_chat_message {
 392                if state
 393                    .last_acknowledged_message_id()
 394                    .is_some_and(|id| id < last_message_id)
 395                {
 396                    return state.last_acknowledged_message_id();
 397                }
 398            }
 399
 400            None
 401        })
 402    }
 403
 404    pub fn acknowledge_message_id(
 405        &mut self,
 406        channel_id: ChannelId,
 407        message_id: u64,
 408        cx: &mut ModelContext<Self>,
 409    ) {
 410        self.channel_states
 411            .entry(channel_id)
 412            .or_insert_with(|| Default::default())
 413            .acknowledge_message_id(message_id);
 414        cx.notify();
 415    }
 416
 417    pub fn update_latest_message_id(
 418        &mut self,
 419        channel_id: ChannelId,
 420        message_id: u64,
 421        cx: &mut ModelContext<Self>,
 422    ) {
 423        self.channel_states
 424            .entry(channel_id)
 425            .or_insert_with(|| Default::default())
 426            .update_latest_message_id(message_id);
 427        cx.notify();
 428    }
 429
 430    pub fn acknowledge_notes_version(
 431        &mut self,
 432        channel_id: ChannelId,
 433        epoch: u64,
 434        version: &clock::Global,
 435        cx: &mut ModelContext<Self>,
 436    ) {
 437        self.channel_states
 438            .entry(channel_id)
 439            .or_insert_with(|| Default::default())
 440            .acknowledge_notes_version(epoch, version);
 441        cx.notify()
 442    }
 443
 444    pub fn update_latest_notes_version(
 445        &mut self,
 446        channel_id: ChannelId,
 447        epoch: u64,
 448        version: &clock::Global,
 449        cx: &mut ModelContext<Self>,
 450    ) {
 451        self.channel_states
 452            .entry(channel_id)
 453            .or_insert_with(|| Default::default())
 454            .update_latest_notes_version(epoch, version);
 455        cx.notify()
 456    }
 457
 458    pub fn open_channel_chat(
 459        &mut self,
 460        channel_id: ChannelId,
 461        cx: &mut ModelContext<Self>,
 462    ) -> Task<Result<Model<ChannelChat>>> {
 463        let client = self.client.clone();
 464        let user_store = self.user_store.clone();
 465        let this = cx.handle();
 466        self.open_channel_resource(
 467            channel_id,
 468            |this| &mut this.opened_chats,
 469            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 470            cx,
 471        )
 472    }
 473
 474    /// Asynchronously open a given resource associated with a channel.
 475    ///
 476    /// Make sure that the resource is only opened once, even if this method
 477    /// is called multiple times with the same channel id while the first task
 478    /// is still running.
 479    fn open_channel_resource<T, F, Fut>(
 480        &mut self,
 481        channel_id: ChannelId,
 482        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 483        load: F,
 484        cx: &mut ModelContext<Self>,
 485    ) -> Task<Result<Model<T>>>
 486    where
 487        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 488        Fut: Future<Output = Result<Model<T>>>,
 489        T: 'static,
 490    {
 491        let task = loop {
 492            match get_map(self).entry(channel_id) {
 493                hash_map::Entry::Occupied(e) => match e.get() {
 494                    OpenedModelHandle::Open(model) => {
 495                        if let Some(model) = model.upgrade() {
 496                            break Task::ready(Ok(model)).shared();
 497                        } else {
 498                            get_map(self).remove(&channel_id);
 499                            continue;
 500                        }
 501                    }
 502                    OpenedModelHandle::Loading(task) => {
 503                        break task.clone();
 504                    }
 505                },
 506                hash_map::Entry::Vacant(e) => {
 507                    let task = cx
 508                        .spawn(move |this, mut cx| async move {
 509                            let channel = this.update(&mut cx, |this, _| {
 510                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 511                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 512                                })
 513                            })??;
 514
 515                            load(channel, cx).await.map_err(Arc::new)
 516                        })
 517                        .shared();
 518
 519                    e.insert(OpenedModelHandle::Loading(task.clone()));
 520                    cx.spawn({
 521                        let task = task.clone();
 522                        move |this, mut cx| async move {
 523                            let result = task.await;
 524                            this.update(&mut cx, |this, _| match result {
 525                                Ok(model) => {
 526                                    get_map(this).insert(
 527                                        channel_id,
 528                                        OpenedModelHandle::Open(model.downgrade()),
 529                                    );
 530                                }
 531                                Err(_) => {
 532                                    get_map(this).remove(&channel_id);
 533                                }
 534                            })
 535                            .ok();
 536                        }
 537                    })
 538                    .detach();
 539                    break task;
 540                }
 541            }
 542        };
 543        cx.background_executor()
 544            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 545    }
 546
 547    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 548        self.channel_role(channel_id) == proto::ChannelRole::Admin
 549    }
 550
 551    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 552        self.channel_index
 553            .by_id()
 554            .get(&channel_id)
 555            .map_or(false, |channel| channel.is_root_channel())
 556    }
 557
 558    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 559        self.channel_index
 560            .by_id()
 561            .get(&channel_id)
 562            .map_or(false, |channel| {
 563                channel.visibility == ChannelVisibility::Public
 564            })
 565    }
 566
 567    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 568        match self.channel_role(channel_id) {
 569            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 570            _ => Capability::ReadOnly,
 571        }
 572    }
 573
 574    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 575        maybe!({
 576            let mut channel = self.channel_for_id(channel_id)?;
 577            if !channel.is_root_channel() {
 578                channel = self.channel_for_id(channel.root_id())?;
 579            }
 580            let root_channel_state = self.channel_states.get(&channel.id);
 581            root_channel_state?.role
 582        })
 583        .unwrap_or(proto::ChannelRole::Guest)
 584    }
 585
 586    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 587        self.channel_participants
 588            .get(&channel_id)
 589            .map_or(&[], |v| v.as_slice())
 590    }
 591
 592    pub fn create_channel(
 593        &self,
 594        name: &str,
 595        parent_id: Option<ChannelId>,
 596        cx: &mut ModelContext<Self>,
 597    ) -> Task<Result<ChannelId>> {
 598        let client = self.client.clone();
 599        let name = name.trim_start_matches('#').to_owned();
 600        cx.spawn(move |this, mut cx| async move {
 601            let response = client
 602                .request(proto::CreateChannel {
 603                    name,
 604                    parent_id: parent_id.map(|cid| cid.0),
 605                })
 606                .await?;
 607
 608            let channel = response
 609                .channel
 610                .ok_or_else(|| anyhow!("missing channel in response"))?;
 611            let channel_id = ChannelId(channel.id);
 612
 613            this.update(&mut cx, |this, cx| {
 614                let task = this.update_channels(
 615                    proto::UpdateChannels {
 616                        channels: vec![channel],
 617                        ..Default::default()
 618                    },
 619                    cx,
 620                );
 621                assert!(task.is_none());
 622
 623                // This event is emitted because the collab panel wants to clear the pending edit state
 624                // before this frame is rendered. But we can't guarantee that the collab panel's future
 625                // will resolve before this flush_effects finishes. Synchronously emitting this event
 626                // ensures that the collab panel will observe this creation before the frame completes
 627                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 628            })?;
 629
 630            Ok(channel_id)
 631        })
 632    }
 633
 634    pub fn move_channel(
 635        &mut self,
 636        channel_id: ChannelId,
 637        to: ChannelId,
 638        cx: &mut ModelContext<Self>,
 639    ) -> Task<Result<()>> {
 640        let client = self.client.clone();
 641        cx.spawn(move |_, _| async move {
 642            let _ = client
 643                .request(proto::MoveChannel {
 644                    channel_id: channel_id.0,
 645                    to: to.0,
 646                })
 647                .await?;
 648
 649            Ok(())
 650        })
 651    }
 652
 653    pub fn set_channel_visibility(
 654        &mut self,
 655        channel_id: ChannelId,
 656        visibility: ChannelVisibility,
 657        cx: &mut ModelContext<Self>,
 658    ) -> Task<Result<()>> {
 659        let client = self.client.clone();
 660        cx.spawn(move |_, _| async move {
 661            let _ = client
 662                .request(proto::SetChannelVisibility {
 663                    channel_id: channel_id.0,
 664                    visibility: visibility.into(),
 665                })
 666                .await?;
 667
 668            Ok(())
 669        })
 670    }
 671
 672    pub fn invite_member(
 673        &mut self,
 674        channel_id: ChannelId,
 675        user_id: UserId,
 676        role: proto::ChannelRole,
 677        cx: &mut ModelContext<Self>,
 678    ) -> Task<Result<()>> {
 679        if !self.outgoing_invites.insert((channel_id, user_id)) {
 680            return Task::ready(Err(anyhow!("invite request already in progress")));
 681        }
 682
 683        cx.notify();
 684        let client = self.client.clone();
 685        cx.spawn(move |this, mut cx| async move {
 686            let result = client
 687                .request(proto::InviteChannelMember {
 688                    channel_id: channel_id.0,
 689                    user_id,
 690                    role: role.into(),
 691                })
 692                .await;
 693
 694            this.update(&mut cx, |this, cx| {
 695                this.outgoing_invites.remove(&(channel_id, user_id));
 696                cx.notify();
 697            })?;
 698
 699            result?;
 700
 701            Ok(())
 702        })
 703    }
 704
 705    pub fn remove_member(
 706        &mut self,
 707        channel_id: ChannelId,
 708        user_id: u64,
 709        cx: &mut ModelContext<Self>,
 710    ) -> Task<Result<()>> {
 711        if !self.outgoing_invites.insert((channel_id, user_id)) {
 712            return Task::ready(Err(anyhow!("invite request already in progress")));
 713        }
 714
 715        cx.notify();
 716        let client = self.client.clone();
 717        cx.spawn(move |this, mut cx| async move {
 718            let result = client
 719                .request(proto::RemoveChannelMember {
 720                    channel_id: channel_id.0,
 721                    user_id,
 722                })
 723                .await;
 724
 725            this.update(&mut cx, |this, cx| {
 726                this.outgoing_invites.remove(&(channel_id, user_id));
 727                cx.notify();
 728            })?;
 729            result?;
 730            Ok(())
 731        })
 732    }
 733
 734    pub fn set_member_role(
 735        &mut self,
 736        channel_id: ChannelId,
 737        user_id: UserId,
 738        role: proto::ChannelRole,
 739        cx: &mut ModelContext<Self>,
 740    ) -> Task<Result<()>> {
 741        if !self.outgoing_invites.insert((channel_id, user_id)) {
 742            return Task::ready(Err(anyhow!("member request already in progress")));
 743        }
 744
 745        cx.notify();
 746        let client = self.client.clone();
 747        cx.spawn(move |this, mut cx| async move {
 748            let result = client
 749                .request(proto::SetChannelMemberRole {
 750                    channel_id: channel_id.0,
 751                    user_id,
 752                    role: role.into(),
 753                })
 754                .await;
 755
 756            this.update(&mut cx, |this, cx| {
 757                this.outgoing_invites.remove(&(channel_id, user_id));
 758                cx.notify();
 759            })?;
 760
 761            result?;
 762            Ok(())
 763        })
 764    }
 765
 766    pub fn rename(
 767        &mut self,
 768        channel_id: ChannelId,
 769        new_name: &str,
 770        cx: &mut ModelContext<Self>,
 771    ) -> Task<Result<()>> {
 772        let client = self.client.clone();
 773        let name = new_name.to_string();
 774        cx.spawn(move |this, mut cx| async move {
 775            let channel = client
 776                .request(proto::RenameChannel {
 777                    channel_id: channel_id.0,
 778                    name,
 779                })
 780                .await?
 781                .channel
 782                .ok_or_else(|| anyhow!("missing channel in response"))?;
 783            this.update(&mut cx, |this, cx| {
 784                let task = this.update_channels(
 785                    proto::UpdateChannels {
 786                        channels: vec![channel],
 787                        ..Default::default()
 788                    },
 789                    cx,
 790                );
 791                assert!(task.is_none());
 792
 793                // This event is emitted because the collab panel wants to clear the pending edit state
 794                // before this frame is rendered. But we can't guarantee that the collab panel's future
 795                // will resolve before this flush_effects finishes. Synchronously emitting this event
 796                // ensures that the collab panel will observe this creation before the frame complete
 797                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 798            })?;
 799            Ok(())
 800        })
 801    }
 802
 803    pub fn respond_to_channel_invite(
 804        &mut self,
 805        channel_id: ChannelId,
 806        accept: bool,
 807        cx: &mut ModelContext<Self>,
 808    ) -> Task<Result<()>> {
 809        let client = self.client.clone();
 810        cx.background_executor().spawn(async move {
 811            client
 812                .request(proto::RespondToChannelInvite {
 813                    channel_id: channel_id.0,
 814                    accept,
 815                })
 816                .await?;
 817            Ok(())
 818        })
 819    }
 820
 821    pub fn get_channel_member_details(
 822        &self,
 823        channel_id: ChannelId,
 824        cx: &mut ModelContext<Self>,
 825    ) -> Task<Result<Vec<ChannelMembership>>> {
 826        let client = self.client.clone();
 827        let user_store = self.user_store.downgrade();
 828        cx.spawn(move |_, mut cx| async move {
 829            let response = client
 830                .request(proto::GetChannelMembers {
 831                    channel_id: channel_id.0,
 832                })
 833                .await?;
 834
 835            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 836            let user_store = user_store
 837                .upgrade()
 838                .ok_or_else(|| anyhow!("user store dropped"))?;
 839            let users = user_store
 840                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 841                .await?;
 842
 843            Ok(users
 844                .into_iter()
 845                .zip(response.members)
 846                .map(|(user, member)| ChannelMembership {
 847                    user,
 848                    role: member.role(),
 849                    kind: member.kind(),
 850                })
 851                .collect())
 852        })
 853    }
 854
 855    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 856        let client = self.client.clone();
 857        async move {
 858            client
 859                .request(proto::DeleteChannel {
 860                    channel_id: channel_id.0,
 861                })
 862                .await?;
 863            Ok(())
 864        }
 865    }
 866
 867    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 868        false
 869    }
 870
 871    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 872        self.outgoing_invites.contains(&(channel_id, user_id))
 873    }
 874
 875    async fn handle_update_channels(
 876        this: Model<Self>,
 877        message: TypedEnvelope<proto::UpdateChannels>,
 878        _: Arc<Client>,
 879        mut cx: AsyncAppContext,
 880    ) -> Result<()> {
 881        this.update(&mut cx, |this, _| {
 882            this.update_channels_tx
 883                .unbounded_send(message.payload)
 884                .unwrap();
 885        })?;
 886        Ok(())
 887    }
 888
 889    async fn handle_update_user_channels(
 890        this: Model<Self>,
 891        message: TypedEnvelope<proto::UpdateUserChannels>,
 892        _: Arc<Client>,
 893        mut cx: AsyncAppContext,
 894    ) -> Result<()> {
 895        this.update(&mut cx, |this, cx| {
 896            for buffer_version in message.payload.observed_channel_buffer_version {
 897                let version = language::proto::deserialize_version(&buffer_version.version);
 898                this.acknowledge_notes_version(
 899                    ChannelId(buffer_version.channel_id),
 900                    buffer_version.epoch,
 901                    &version,
 902                    cx,
 903                );
 904            }
 905            for message_id in message.payload.observed_channel_message_id {
 906                this.acknowledge_message_id(
 907                    ChannelId(message_id.channel_id),
 908                    message_id.message_id,
 909                    cx,
 910                );
 911            }
 912            for membership in message.payload.channel_memberships {
 913                if let Some(role) = ChannelRole::from_i32(membership.role) {
 914                    this.channel_states
 915                        .entry(ChannelId(membership.channel_id))
 916                        .or_insert_with(|| ChannelState::default())
 917                        .set_role(role)
 918                }
 919            }
 920        })
 921    }
 922
 923    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 924        self.channel_index.clear();
 925        self.channel_invitations.clear();
 926        self.channel_participants.clear();
 927        self.channel_index.clear();
 928        self.outgoing_invites.clear();
 929        self.disconnect_channel_buffers_task.take();
 930
 931        for chat in self.opened_chats.values() {
 932            if let OpenedModelHandle::Open(chat) = chat {
 933                if let Some(chat) = chat.upgrade() {
 934                    chat.update(cx, |chat, cx| {
 935                        chat.rejoin(cx);
 936                    });
 937                }
 938            }
 939        }
 940
 941        let mut buffer_versions = Vec::new();
 942        for buffer in self.opened_buffers.values() {
 943            if let OpenedModelHandle::Open(buffer) = buffer {
 944                if let Some(buffer) = buffer.upgrade() {
 945                    let channel_buffer = buffer.read(cx);
 946                    let buffer = channel_buffer.buffer().read(cx);
 947                    buffer_versions.push(proto::ChannelBufferVersion {
 948                        channel_id: channel_buffer.channel_id.0,
 949                        epoch: channel_buffer.epoch(),
 950                        version: language::proto::serialize_version(&buffer.version()),
 951                    });
 952                }
 953            }
 954        }
 955
 956        if buffer_versions.is_empty() {
 957            return Task::ready(Ok(()));
 958        }
 959
 960        let response = self.client.request(proto::RejoinChannelBuffers {
 961            buffers: buffer_versions,
 962        });
 963
 964        cx.spawn(|this, mut cx| async move {
 965            let mut response = response.await?;
 966
 967            this.update(&mut cx, |this, cx| {
 968                this.opened_buffers.retain(|_, buffer| match buffer {
 969                    OpenedModelHandle::Open(channel_buffer) => {
 970                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 971                            return false;
 972                        };
 973
 974                        channel_buffer.update(cx, |channel_buffer, cx| {
 975                            let channel_id = channel_buffer.channel_id;
 976                            if let Some(remote_buffer) = response
 977                                .buffers
 978                                .iter_mut()
 979                                .find(|buffer| buffer.channel_id == channel_id.0)
 980                            {
 981                                let channel_id = channel_buffer.channel_id;
 982                                let remote_version =
 983                                    language::proto::deserialize_version(&remote_buffer.version);
 984
 985                                channel_buffer.replace_collaborators(
 986                                    mem::take(&mut remote_buffer.collaborators),
 987                                    cx,
 988                                );
 989
 990                                let operations = channel_buffer
 991                                    .buffer()
 992                                    .update(cx, |buffer, cx| {
 993                                        let outgoing_operations =
 994                                            buffer.serialize_ops(Some(remote_version), cx);
 995                                        let incoming_operations =
 996                                            mem::take(&mut remote_buffer.operations)
 997                                                .into_iter()
 998                                                .map(language::proto::deserialize_operation)
 999                                                .collect::<Result<Vec<_>>>()?;
1000                                        buffer.apply_ops(incoming_operations, cx)?;
1001                                        anyhow::Ok(outgoing_operations)
1002                                    })
1003                                    .log_err();
1004
1005                                if let Some(operations) = operations {
1006                                    let client = this.client.clone();
1007                                    cx.background_executor()
1008                                        .spawn(async move {
1009                                            let operations = operations.await;
1010                                            for chunk in
1011                                                language::proto::split_operations(operations)
1012                                            {
1013                                                client
1014                                                    .send(proto::UpdateChannelBuffer {
1015                                                        channel_id: channel_id.0,
1016                                                        operations: chunk,
1017                                                    })
1018                                                    .ok();
1019                                            }
1020                                        })
1021                                        .detach();
1022                                    return true;
1023                                }
1024                            }
1025
1026                            channel_buffer.disconnect(cx);
1027                            false
1028                        })
1029                    }
1030                    OpenedModelHandle::Loading(_) => true,
1031                });
1032            })
1033            .ok();
1034            anyhow::Ok(())
1035        })
1036    }
1037
1038    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1039        cx.notify();
1040
1041        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1042            cx.spawn(move |this, mut cx| async move {
1043                if wait_for_reconnect {
1044                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1045                }
1046
1047                if let Some(this) = this.upgrade() {
1048                    this.update(&mut cx, |this, cx| {
1049                        for (_, buffer) in this.opened_buffers.drain() {
1050                            if let OpenedModelHandle::Open(buffer) = buffer {
1051                                if let Some(buffer) = buffer.upgrade() {
1052                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1053                                }
1054                            }
1055                        }
1056                    })
1057                    .ok();
1058                }
1059            })
1060        });
1061    }
1062
1063    pub(crate) fn update_channels(
1064        &mut self,
1065        payload: proto::UpdateChannels,
1066        cx: &mut ModelContext<ChannelStore>,
1067    ) -> Option<Task<Result<()>>> {
1068        if !payload.remove_channel_invitations.is_empty() {
1069            self.channel_invitations
1070                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1071        }
1072        for channel in payload.channel_invitations {
1073            match self
1074                .channel_invitations
1075                .binary_search_by_key(&channel.id, |c| c.id.0)
1076            {
1077                Ok(ix) => {
1078                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1079                }
1080                Err(ix) => self.channel_invitations.insert(
1081                    ix,
1082                    Arc::new(Channel {
1083                        id: ChannelId(channel.id),
1084                        visibility: channel.visibility(),
1085                        name: channel.name.into(),
1086                        parent_path: channel
1087                            .parent_path
1088                            .into_iter()
1089                            .map(|cid| ChannelId(cid))
1090                            .collect(),
1091                    }),
1092                ),
1093            }
1094        }
1095
1096        let channels_changed = !payload.channels.is_empty()
1097            || !payload.delete_channels.is_empty()
1098            || !payload.latest_channel_message_ids.is_empty()
1099            || !payload.latest_channel_buffer_versions.is_empty()
1100            || !payload.hosted_projects.is_empty()
1101            || !payload.deleted_hosted_projects.is_empty();
1102
1103        if channels_changed {
1104            if !payload.delete_channels.is_empty() {
1105                let delete_channels: Vec<ChannelId> = payload
1106                    .delete_channels
1107                    .into_iter()
1108                    .map(|cid| ChannelId(cid))
1109                    .collect();
1110                self.channel_index.delete_channels(&delete_channels);
1111                self.channel_participants
1112                    .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1113
1114                for channel_id in &delete_channels {
1115                    let channel_id = *channel_id;
1116                    if payload
1117                        .channels
1118                        .iter()
1119                        .any(|channel| channel.id == channel_id.0)
1120                    {
1121                        continue;
1122                    }
1123                    if let Some(OpenedModelHandle::Open(buffer)) =
1124                        self.opened_buffers.remove(&channel_id)
1125                    {
1126                        if let Some(buffer) = buffer.upgrade() {
1127                            buffer.update(cx, ChannelBuffer::disconnect);
1128                        }
1129                    }
1130                }
1131            }
1132
1133            let mut index = self.channel_index.bulk_insert();
1134            for channel in payload.channels {
1135                let id = ChannelId(channel.id);
1136                let channel_changed = index.insert(channel);
1137
1138                if channel_changed {
1139                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1140                        if let Some(buffer) = buffer.upgrade() {
1141                            buffer.update(cx, ChannelBuffer::channel_changed);
1142                        }
1143                    }
1144                }
1145            }
1146
1147            for latest_buffer_version in payload.latest_channel_buffer_versions {
1148                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1149                self.channel_states
1150                    .entry(ChannelId(latest_buffer_version.channel_id))
1151                    .or_default()
1152                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1153            }
1154
1155            for latest_channel_message in payload.latest_channel_message_ids {
1156                self.channel_states
1157                    .entry(ChannelId(latest_channel_message.channel_id))
1158                    .or_default()
1159                    .update_latest_message_id(latest_channel_message.message_id);
1160            }
1161
1162            for hosted_project in payload.hosted_projects {
1163                let hosted_project: HostedProject = hosted_project.into();
1164                if let Some(old_project) = self
1165                    .hosted_projects
1166                    .insert(hosted_project.project_id, hosted_project.clone())
1167                {
1168                    self.channel_states
1169                        .entry(old_project.channel_id)
1170                        .or_default()
1171                        .remove_hosted_project(old_project.project_id);
1172                }
1173                self.channel_states
1174                    .entry(hosted_project.channel_id)
1175                    .or_default()
1176                    .add_hosted_project(hosted_project.project_id);
1177            }
1178
1179            for hosted_project_id in payload.deleted_hosted_projects {
1180                let hosted_project_id = ProjectId(hosted_project_id);
1181
1182                if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1183                    self.channel_states
1184                        .entry(old_project.channel_id)
1185                        .or_default()
1186                        .remove_hosted_project(old_project.project_id);
1187                }
1188            }
1189        }
1190
1191        cx.notify();
1192        if payload.channel_participants.is_empty() {
1193            return None;
1194        }
1195
1196        let mut all_user_ids = Vec::new();
1197        let channel_participants = payload.channel_participants;
1198        for entry in &channel_participants {
1199            for user_id in entry.participant_user_ids.iter() {
1200                if let Err(ix) = all_user_ids.binary_search(user_id) {
1201                    all_user_ids.insert(ix, *user_id);
1202                }
1203            }
1204        }
1205
1206        let users = self
1207            .user_store
1208            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1209        Some(cx.spawn(|this, mut cx| async move {
1210            let users = users.await?;
1211
1212            this.update(&mut cx, |this, cx| {
1213                for entry in &channel_participants {
1214                    let mut participants: Vec<_> = entry
1215                        .participant_user_ids
1216                        .iter()
1217                        .filter_map(|user_id| {
1218                            users
1219                                .binary_search_by_key(&user_id, |user| &user.id)
1220                                .ok()
1221                                .map(|ix| users[ix].clone())
1222                        })
1223                        .collect();
1224
1225                    participants.sort_by_key(|u| u.id);
1226
1227                    this.channel_participants
1228                        .insert(ChannelId(entry.channel_id), participants);
1229                }
1230
1231                cx.notify();
1232            })
1233        }))
1234    }
1235}
1236
1237impl ChannelState {
1238    fn set_role(&mut self, role: ChannelRole) {
1239        self.role = Some(role);
1240    }
1241
1242    fn has_channel_buffer_changed(&self) -> bool {
1243        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1244            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1245                && self
1246                    .latest_notes_version
1247                    .version
1248                    .changed_since(&self.observed_notes_version.version))
1249    }
1250
1251    fn has_new_messages(&self) -> bool {
1252        let latest_message_id = self.latest_chat_message;
1253        let observed_message_id = self.observed_chat_message;
1254
1255        latest_message_id.is_some_and(|latest_message_id| {
1256            latest_message_id > observed_message_id.unwrap_or_default()
1257        })
1258    }
1259
1260    fn last_acknowledged_message_id(&self) -> Option<u64> {
1261        self.observed_chat_message
1262    }
1263
1264    fn acknowledge_message_id(&mut self, message_id: u64) {
1265        let observed = self.observed_chat_message.get_or_insert(message_id);
1266        *observed = (*observed).max(message_id);
1267    }
1268
1269    fn update_latest_message_id(&mut self, message_id: u64) {
1270        self.latest_chat_message =
1271            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1272    }
1273
1274    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1275        if self.observed_notes_version.epoch == epoch {
1276            self.observed_notes_version.version.join(version);
1277        } else {
1278            self.observed_notes_version = NotesVersion {
1279                epoch,
1280                version: version.clone(),
1281            };
1282        }
1283    }
1284
1285    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1286        if self.latest_notes_version.epoch == epoch {
1287            self.latest_notes_version.version.join(version);
1288        } else {
1289            self.latest_notes_version = NotesVersion {
1290                epoch,
1291                version: version.clone(),
1292            };
1293        }
1294    }
1295
1296    fn add_hosted_project(&mut self, project_id: ProjectId) {
1297        self.projects.insert(project_id);
1298    }
1299
1300    fn remove_hosted_project(&mut self, project_id: ProjectId) {
1301        self.projects.remove(&project_id);
1302    }
1303}