channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use release_channel::RELEASE_CHANNEL;
  15use rpc::{
  16    proto::{self, ChannelRole, ChannelVisibility},
  17    TypedEnvelope,
  18};
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{async_maybe, maybe, ResultExt};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  25    let channel_store =
  26        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
  31pub struct HostedProjectId(pub u64);
  32
  33#[derive(Debug, Clone, Default)]
  34struct NotesVersion {
  35    epoch: u64,
  36    version: clock::Global,
  37}
  38
  39#[derive(Debug, Clone)]
  40pub struct HostedProject {
  41    id: HostedProjectId,
  42    channel_id: ChannelId,
  43    name: SharedString,
  44    _visibility: proto::ChannelVisibility,
  45}
  46
  47impl From<proto::HostedProject> for HostedProject {
  48    fn from(project: proto::HostedProject) -> Self {
  49        Self {
  50            id: HostedProjectId(project.id),
  51            channel_id: ChannelId(project.channel_id),
  52            _visibility: project.visibility(),
  53            name: project.name.into(),
  54        }
  55    }
  56}
  57
  58pub struct ChannelStore {
  59    pub channel_index: ChannelIndex,
  60    channel_invitations: Vec<Arc<Channel>>,
  61    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  62    channel_states: HashMap<ChannelId, ChannelState>,
  63    hosted_projects: HashMap<HostedProjectId, HostedProject>,
  64
  65    outgoing_invites: HashSet<(ChannelId, UserId)>,
  66    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  67    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  68    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  69    client: Arc<Client>,
  70    user_store: Model<UserStore>,
  71    _rpc_subscriptions: [Subscription; 2],
  72    _watch_connection_status: Task<Option<()>>,
  73    disconnect_channel_buffers_task: Option<Task<()>>,
  74    _update_channels: Task<()>,
  75}
  76
  77#[derive(Clone, Debug)]
  78pub struct Channel {
  79    pub id: ChannelId,
  80    pub name: SharedString,
  81    pub visibility: proto::ChannelVisibility,
  82    pub parent_path: Vec<ChannelId>,
  83}
  84
  85#[derive(Default)]
  86pub struct ChannelState {
  87    latest_chat_message: Option<u64>,
  88    latest_notes_versions: Option<NotesVersion>,
  89    observed_chat_message: Option<u64>,
  90    observed_notes_versions: Option<NotesVersion>,
  91    role: Option<ChannelRole>,
  92    projects: HashSet<HostedProjectId>,
  93}
  94
  95impl Channel {
  96    pub fn link(&self) -> String {
  97        RELEASE_CHANNEL.link_prefix().to_owned()
  98            + "channel/"
  99            + &Self::slug(&self.name)
 100            + "-"
 101            + &self.id.to_string()
 102    }
 103
 104    pub fn notes_link(&self, heading: Option<String>) -> String {
 105        self.link()
 106            + "/notes"
 107            + &heading
 108                .map(|h| format!("#{}", Self::slug(&h)))
 109                .unwrap_or_default()
 110    }
 111
 112    pub fn is_root_channel(&self) -> bool {
 113        self.parent_path.is_empty()
 114    }
 115
 116    pub fn root_id(&self) -> ChannelId {
 117        self.parent_path.first().copied().unwrap_or(self.id)
 118    }
 119
 120    pub fn slug(str: &str) -> String {
 121        let slug: String = str
 122            .chars()
 123            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 124            .collect();
 125
 126        slug.trim_matches(|c| c == '-').to_string()
 127    }
 128}
 129
 130pub struct ChannelMembership {
 131    pub user: Arc<User>,
 132    pub kind: proto::channel_member::Kind,
 133    pub role: proto::ChannelRole,
 134}
 135impl ChannelMembership {
 136    pub fn sort_key(&self) -> MembershipSortKey {
 137        MembershipSortKey {
 138            role_order: match self.role {
 139                proto::ChannelRole::Admin => 0,
 140                proto::ChannelRole::Member => 1,
 141                proto::ChannelRole::Banned => 2,
 142                proto::ChannelRole::Talker => 3,
 143                proto::ChannelRole::Guest => 4,
 144            },
 145            kind_order: match self.kind {
 146                proto::channel_member::Kind::Member => 0,
 147                proto::channel_member::Kind::Invitee => 1,
 148            },
 149            username_order: self.user.github_login.as_str(),
 150        }
 151    }
 152}
 153
 154#[derive(PartialOrd, Ord, PartialEq, Eq)]
 155pub struct MembershipSortKey<'a> {
 156    role_order: u8,
 157    kind_order: u8,
 158    username_order: &'a str,
 159}
 160
 161pub enum ChannelEvent {
 162    ChannelCreated(ChannelId),
 163    ChannelRenamed(ChannelId),
 164}
 165
 166impl EventEmitter<ChannelEvent> for ChannelStore {}
 167
 168enum OpenedModelHandle<E> {
 169    Open(WeakModel<E>),
 170    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 171}
 172
 173struct GlobalChannelStore(Model<ChannelStore>);
 174
 175impl Global for GlobalChannelStore {}
 176
 177impl ChannelStore {
 178    pub fn global(cx: &AppContext) -> Model<Self> {
 179        cx.global::<GlobalChannelStore>().0.clone()
 180    }
 181
 182    pub fn new(
 183        client: Arc<Client>,
 184        user_store: Model<UserStore>,
 185        cx: &mut ModelContext<Self>,
 186    ) -> Self {
 187        let rpc_subscriptions = [
 188            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 189            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 190        ];
 191
 192        let mut connection_status = client.status();
 193        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 194        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 195            while let Some(status) = connection_status.next().await {
 196                let this = this.upgrade()?;
 197                match status {
 198                    client::Status::Connected { .. } => {
 199                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 200                            .ok()?
 201                            .await
 202                            .log_err()?;
 203                    }
 204                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 205                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 206                            .ok();
 207                    }
 208                    _ => {
 209                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 210                            .ok();
 211                    }
 212                }
 213            }
 214            Some(())
 215        });
 216
 217        Self {
 218            channel_invitations: Vec::default(),
 219            channel_index: ChannelIndex::default(),
 220            channel_participants: Default::default(),
 221            hosted_projects: Default::default(),
 222            outgoing_invites: Default::default(),
 223            opened_buffers: Default::default(),
 224            opened_chats: Default::default(),
 225            update_channels_tx,
 226            client,
 227            user_store,
 228            _rpc_subscriptions: rpc_subscriptions,
 229            _watch_connection_status: watch_connection_status,
 230            disconnect_channel_buffers_task: None,
 231            _update_channels: cx.spawn(|this, mut cx| async move {
 232                async_maybe!({
 233                    while let Some(update_channels) = update_channels_rx.next().await {
 234                        if let Some(this) = this.upgrade() {
 235                            let update_task = this.update(&mut cx, |this, cx| {
 236                                this.update_channels(update_channels, cx)
 237                            })?;
 238                            if let Some(update_task) = update_task {
 239                                update_task.await.log_err();
 240                            }
 241                        }
 242                    }
 243                    anyhow::Ok(())
 244                })
 245                .await
 246                .log_err();
 247            }),
 248            channel_states: Default::default(),
 249        }
 250    }
 251
 252    pub fn client(&self) -> Arc<Client> {
 253        self.client.clone()
 254    }
 255
 256    /// Returns the number of unique channels in the store
 257    pub fn channel_count(&self) -> usize {
 258        self.channel_index.by_id().len()
 259    }
 260
 261    /// Returns the index of a channel ID in the list of unique channels
 262    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 263        self.channel_index
 264            .by_id()
 265            .keys()
 266            .position(|id| *id == channel_id)
 267    }
 268
 269    /// Returns an iterator over all unique channels
 270    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 271        self.channel_index.by_id().values()
 272    }
 273
 274    /// Iterate over all entries in the channel DAG
 275    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 276        self.channel_index
 277            .ordered_channels()
 278            .iter()
 279            .filter_map(move |id| {
 280                let channel = self.channel_index.by_id().get(id)?;
 281                Some((channel.parent_path.len(), channel))
 282            })
 283    }
 284
 285    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 286        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 287        self.channel_index.by_id().get(channel_id)
 288    }
 289
 290    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 291        self.channel_index.by_id().values().nth(ix)
 292    }
 293
 294    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 295        self.channel_invitations
 296            .iter()
 297            .any(|channel| channel.id == channel_id)
 298    }
 299
 300    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 301        &self.channel_invitations
 302    }
 303
 304    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 305        self.channel_index.by_id().get(&channel_id)
 306    }
 307
 308    pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, HostedProjectId)> {
 309        let mut projects: Vec<(SharedString, HostedProjectId)> = self
 310            .channel_states
 311            .get(&channel_id)
 312            .map(|state| state.projects.clone())
 313            .unwrap_or_default()
 314            .into_iter()
 315            .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
 316            .collect();
 317        projects.sort();
 318        projects
 319    }
 320
 321    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 322        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 323            if let OpenedModelHandle::Open(buffer) = buffer {
 324                return buffer.upgrade().is_some();
 325            }
 326        }
 327        false
 328    }
 329
 330    pub fn open_channel_buffer(
 331        &mut self,
 332        channel_id: ChannelId,
 333        cx: &mut ModelContext<Self>,
 334    ) -> Task<Result<Model<ChannelBuffer>>> {
 335        let client = self.client.clone();
 336        let user_store = self.user_store.clone();
 337        let channel_store = cx.handle();
 338        self.open_channel_resource(
 339            channel_id,
 340            |this| &mut this.opened_buffers,
 341            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 342            cx,
 343        )
 344    }
 345
 346    pub fn fetch_channel_messages(
 347        &self,
 348        message_ids: Vec<u64>,
 349        cx: &mut ModelContext<Self>,
 350    ) -> Task<Result<Vec<ChannelMessage>>> {
 351        let request = if message_ids.is_empty() {
 352            None
 353        } else {
 354            Some(
 355                self.client
 356                    .request(proto::GetChannelMessagesById { message_ids }),
 357            )
 358        };
 359        cx.spawn(|this, mut cx| async move {
 360            if let Some(request) = request {
 361                let response = request.await?;
 362                let this = this
 363                    .upgrade()
 364                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 365                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 366                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 367            } else {
 368                Ok(Vec::new())
 369            }
 370        })
 371    }
 372
 373    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 374        self.channel_states
 375            .get(&channel_id)
 376            .is_some_and(|state| state.has_channel_buffer_changed())
 377    }
 378
 379    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 380        self.channel_states
 381            .get(&channel_id)
 382            .is_some_and(|state| state.has_new_messages())
 383    }
 384
 385    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 386        self.channel_states.get(&channel_id).and_then(|state| {
 387            if let Some(last_message_id) = state.latest_chat_message {
 388                if state
 389                    .last_acknowledged_message_id()
 390                    .is_some_and(|id| id < last_message_id)
 391                {
 392                    return state.last_acknowledged_message_id();
 393                }
 394            }
 395
 396            None
 397        })
 398    }
 399
 400    pub fn acknowledge_message_id(
 401        &mut self,
 402        channel_id: ChannelId,
 403        message_id: u64,
 404        cx: &mut ModelContext<Self>,
 405    ) {
 406        self.channel_states
 407            .entry(channel_id)
 408            .or_insert_with(|| Default::default())
 409            .acknowledge_message_id(message_id);
 410        cx.notify();
 411    }
 412
 413    pub fn update_latest_message_id(
 414        &mut self,
 415        channel_id: ChannelId,
 416        message_id: u64,
 417        cx: &mut ModelContext<Self>,
 418    ) {
 419        self.channel_states
 420            .entry(channel_id)
 421            .or_insert_with(|| Default::default())
 422            .update_latest_message_id(message_id);
 423        cx.notify();
 424    }
 425
 426    pub fn acknowledge_notes_version(
 427        &mut self,
 428        channel_id: ChannelId,
 429        epoch: u64,
 430        version: &clock::Global,
 431        cx: &mut ModelContext<Self>,
 432    ) {
 433        self.channel_states
 434            .entry(channel_id)
 435            .or_insert_with(|| Default::default())
 436            .acknowledge_notes_version(epoch, version);
 437        cx.notify()
 438    }
 439
 440    pub fn update_latest_notes_version(
 441        &mut self,
 442        channel_id: ChannelId,
 443        epoch: u64,
 444        version: &clock::Global,
 445        cx: &mut ModelContext<Self>,
 446    ) {
 447        self.channel_states
 448            .entry(channel_id)
 449            .or_insert_with(|| Default::default())
 450            .update_latest_notes_version(epoch, version);
 451        cx.notify()
 452    }
 453
 454    pub fn open_channel_chat(
 455        &mut self,
 456        channel_id: ChannelId,
 457        cx: &mut ModelContext<Self>,
 458    ) -> Task<Result<Model<ChannelChat>>> {
 459        let client = self.client.clone();
 460        let user_store = self.user_store.clone();
 461        let this = cx.handle();
 462        self.open_channel_resource(
 463            channel_id,
 464            |this| &mut this.opened_chats,
 465            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 466            cx,
 467        )
 468    }
 469
 470    /// Asynchronously open a given resource associated with a channel.
 471    ///
 472    /// Make sure that the resource is only opened once, even if this method
 473    /// is called multiple times with the same channel id while the first task
 474    /// is still running.
 475    fn open_channel_resource<T, F, Fut>(
 476        &mut self,
 477        channel_id: ChannelId,
 478        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 479        load: F,
 480        cx: &mut ModelContext<Self>,
 481    ) -> Task<Result<Model<T>>>
 482    where
 483        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 484        Fut: Future<Output = Result<Model<T>>>,
 485        T: 'static,
 486    {
 487        let task = loop {
 488            match get_map(self).entry(channel_id) {
 489                hash_map::Entry::Occupied(e) => match e.get() {
 490                    OpenedModelHandle::Open(model) => {
 491                        if let Some(model) = model.upgrade() {
 492                            break Task::ready(Ok(model)).shared();
 493                        } else {
 494                            get_map(self).remove(&channel_id);
 495                            continue;
 496                        }
 497                    }
 498                    OpenedModelHandle::Loading(task) => {
 499                        break task.clone();
 500                    }
 501                },
 502                hash_map::Entry::Vacant(e) => {
 503                    let task = cx
 504                        .spawn(move |this, mut cx| async move {
 505                            let channel = this.update(&mut cx, |this, _| {
 506                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 507                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 508                                })
 509                            })??;
 510
 511                            load(channel, cx).await.map_err(Arc::new)
 512                        })
 513                        .shared();
 514
 515                    e.insert(OpenedModelHandle::Loading(task.clone()));
 516                    cx.spawn({
 517                        let task = task.clone();
 518                        move |this, mut cx| async move {
 519                            let result = task.await;
 520                            this.update(&mut cx, |this, _| match result {
 521                                Ok(model) => {
 522                                    get_map(this).insert(
 523                                        channel_id,
 524                                        OpenedModelHandle::Open(model.downgrade()),
 525                                    );
 526                                }
 527                                Err(_) => {
 528                                    get_map(this).remove(&channel_id);
 529                                }
 530                            })
 531                            .ok();
 532                        }
 533                    })
 534                    .detach();
 535                    break task;
 536                }
 537            }
 538        };
 539        cx.background_executor()
 540            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 541    }
 542
 543    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 544        self.channel_role(channel_id) == proto::ChannelRole::Admin
 545    }
 546
 547    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 548        self.channel_index
 549            .by_id()
 550            .get(&channel_id)
 551            .map_or(false, |channel| channel.is_root_channel())
 552    }
 553
 554    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 555        self.channel_index
 556            .by_id()
 557            .get(&channel_id)
 558            .map_or(false, |channel| {
 559                channel.visibility == ChannelVisibility::Public
 560            })
 561    }
 562
 563    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 564        match self.channel_role(channel_id) {
 565            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 566            _ => Capability::ReadOnly,
 567        }
 568    }
 569
 570    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 571        maybe!({
 572            let mut channel = self.channel_for_id(channel_id)?;
 573            if !channel.is_root_channel() {
 574                channel = self.channel_for_id(channel.root_id())?;
 575            }
 576            let root_channel_state = self.channel_states.get(&channel.id);
 577            root_channel_state?.role
 578        })
 579        .unwrap_or(proto::ChannelRole::Guest)
 580    }
 581
 582    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 583        self.channel_participants
 584            .get(&channel_id)
 585            .map_or(&[], |v| v.as_slice())
 586    }
 587
 588    pub fn create_channel(
 589        &self,
 590        name: &str,
 591        parent_id: Option<ChannelId>,
 592        cx: &mut ModelContext<Self>,
 593    ) -> Task<Result<ChannelId>> {
 594        let client = self.client.clone();
 595        let name = name.trim_start_matches("#").to_owned();
 596        cx.spawn(move |this, mut cx| async move {
 597            let response = client
 598                .request(proto::CreateChannel {
 599                    name,
 600                    parent_id: parent_id.map(|cid| cid.0),
 601                })
 602                .await?;
 603
 604            let channel = response
 605                .channel
 606                .ok_or_else(|| anyhow!("missing channel in response"))?;
 607            let channel_id = ChannelId(channel.id);
 608
 609            this.update(&mut cx, |this, cx| {
 610                let task = this.update_channels(
 611                    proto::UpdateChannels {
 612                        channels: vec![channel],
 613                        ..Default::default()
 614                    },
 615                    cx,
 616                );
 617                assert!(task.is_none());
 618
 619                // This event is emitted because the collab panel wants to clear the pending edit state
 620                // before this frame is rendered. But we can't guarantee that the collab panel's future
 621                // will resolve before this flush_effects finishes. Synchronously emitting this event
 622                // ensures that the collab panel will observe this creation before the frame completes
 623                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 624            })?;
 625
 626            Ok(channel_id)
 627        })
 628    }
 629
 630    pub fn move_channel(
 631        &mut self,
 632        channel_id: ChannelId,
 633        to: ChannelId,
 634        cx: &mut ModelContext<Self>,
 635    ) -> Task<Result<()>> {
 636        let client = self.client.clone();
 637        cx.spawn(move |_, _| async move {
 638            let _ = client
 639                .request(proto::MoveChannel {
 640                    channel_id: channel_id.0,
 641                    to: to.0,
 642                })
 643                .await?;
 644
 645            Ok(())
 646        })
 647    }
 648
 649    pub fn set_channel_visibility(
 650        &mut self,
 651        channel_id: ChannelId,
 652        visibility: ChannelVisibility,
 653        cx: &mut ModelContext<Self>,
 654    ) -> Task<Result<()>> {
 655        let client = self.client.clone();
 656        cx.spawn(move |_, _| async move {
 657            let _ = client
 658                .request(proto::SetChannelVisibility {
 659                    channel_id: channel_id.0,
 660                    visibility: visibility.into(),
 661                })
 662                .await?;
 663
 664            Ok(())
 665        })
 666    }
 667
 668    pub fn invite_member(
 669        &mut self,
 670        channel_id: ChannelId,
 671        user_id: UserId,
 672        role: proto::ChannelRole,
 673        cx: &mut ModelContext<Self>,
 674    ) -> Task<Result<()>> {
 675        if !self.outgoing_invites.insert((channel_id, user_id)) {
 676            return Task::ready(Err(anyhow!("invite request already in progress")));
 677        }
 678
 679        cx.notify();
 680        let client = self.client.clone();
 681        cx.spawn(move |this, mut cx| async move {
 682            let result = client
 683                .request(proto::InviteChannelMember {
 684                    channel_id: channel_id.0,
 685                    user_id,
 686                    role: role.into(),
 687                })
 688                .await;
 689
 690            this.update(&mut cx, |this, cx| {
 691                this.outgoing_invites.remove(&(channel_id, user_id));
 692                cx.notify();
 693            })?;
 694
 695            result?;
 696
 697            Ok(())
 698        })
 699    }
 700
 701    pub fn remove_member(
 702        &mut self,
 703        channel_id: ChannelId,
 704        user_id: u64,
 705        cx: &mut ModelContext<Self>,
 706    ) -> Task<Result<()>> {
 707        if !self.outgoing_invites.insert((channel_id, user_id)) {
 708            return Task::ready(Err(anyhow!("invite request already in progress")));
 709        }
 710
 711        cx.notify();
 712        let client = self.client.clone();
 713        cx.spawn(move |this, mut cx| async move {
 714            let result = client
 715                .request(proto::RemoveChannelMember {
 716                    channel_id: channel_id.0,
 717                    user_id,
 718                })
 719                .await;
 720
 721            this.update(&mut cx, |this, cx| {
 722                this.outgoing_invites.remove(&(channel_id, user_id));
 723                cx.notify();
 724            })?;
 725            result?;
 726            Ok(())
 727        })
 728    }
 729
 730    pub fn set_member_role(
 731        &mut self,
 732        channel_id: ChannelId,
 733        user_id: UserId,
 734        role: proto::ChannelRole,
 735        cx: &mut ModelContext<Self>,
 736    ) -> Task<Result<()>> {
 737        if !self.outgoing_invites.insert((channel_id, user_id)) {
 738            return Task::ready(Err(anyhow!("member request already in progress")));
 739        }
 740
 741        cx.notify();
 742        let client = self.client.clone();
 743        cx.spawn(move |this, mut cx| async move {
 744            let result = client
 745                .request(proto::SetChannelMemberRole {
 746                    channel_id: channel_id.0,
 747                    user_id,
 748                    role: role.into(),
 749                })
 750                .await;
 751
 752            this.update(&mut cx, |this, cx| {
 753                this.outgoing_invites.remove(&(channel_id, user_id));
 754                cx.notify();
 755            })?;
 756
 757            result?;
 758            Ok(())
 759        })
 760    }
 761
 762    pub fn rename(
 763        &mut self,
 764        channel_id: ChannelId,
 765        new_name: &str,
 766        cx: &mut ModelContext<Self>,
 767    ) -> Task<Result<()>> {
 768        let client = self.client.clone();
 769        let name = new_name.to_string();
 770        cx.spawn(move |this, mut cx| async move {
 771            let channel = client
 772                .request(proto::RenameChannel {
 773                    channel_id: channel_id.0,
 774                    name,
 775                })
 776                .await?
 777                .channel
 778                .ok_or_else(|| anyhow!("missing channel in response"))?;
 779            this.update(&mut cx, |this, cx| {
 780                let task = this.update_channels(
 781                    proto::UpdateChannels {
 782                        channels: vec![channel],
 783                        ..Default::default()
 784                    },
 785                    cx,
 786                );
 787                assert!(task.is_none());
 788
 789                // This event is emitted because the collab panel wants to clear the pending edit state
 790                // before this frame is rendered. But we can't guarantee that the collab panel's future
 791                // will resolve before this flush_effects finishes. Synchronously emitting this event
 792                // ensures that the collab panel will observe this creation before the frame complete
 793                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 794            })?;
 795            Ok(())
 796        })
 797    }
 798
 799    pub fn respond_to_channel_invite(
 800        &mut self,
 801        channel_id: ChannelId,
 802        accept: bool,
 803        cx: &mut ModelContext<Self>,
 804    ) -> Task<Result<()>> {
 805        let client = self.client.clone();
 806        cx.background_executor().spawn(async move {
 807            client
 808                .request(proto::RespondToChannelInvite {
 809                    channel_id: channel_id.0,
 810                    accept,
 811                })
 812                .await?;
 813            Ok(())
 814        })
 815    }
 816
 817    pub fn get_channel_member_details(
 818        &self,
 819        channel_id: ChannelId,
 820        cx: &mut ModelContext<Self>,
 821    ) -> Task<Result<Vec<ChannelMembership>>> {
 822        let client = self.client.clone();
 823        let user_store = self.user_store.downgrade();
 824        cx.spawn(move |_, mut cx| async move {
 825            let response = client
 826                .request(proto::GetChannelMembers {
 827                    channel_id: channel_id.0,
 828                })
 829                .await?;
 830
 831            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 832            let user_store = user_store
 833                .upgrade()
 834                .ok_or_else(|| anyhow!("user store dropped"))?;
 835            let users = user_store
 836                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 837                .await?;
 838
 839            Ok(users
 840                .into_iter()
 841                .zip(response.members)
 842                .filter_map(|(user, member)| {
 843                    Some(ChannelMembership {
 844                        user,
 845                        role: member.role(),
 846                        kind: member.kind(),
 847                    })
 848                })
 849                .collect())
 850        })
 851    }
 852
 853    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 854        let client = self.client.clone();
 855        async move {
 856            client
 857                .request(proto::DeleteChannel {
 858                    channel_id: channel_id.0,
 859                })
 860                .await?;
 861            Ok(())
 862        }
 863    }
 864
 865    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 866        false
 867    }
 868
 869    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 870        self.outgoing_invites.contains(&(channel_id, user_id))
 871    }
 872
 873    async fn handle_update_channels(
 874        this: Model<Self>,
 875        message: TypedEnvelope<proto::UpdateChannels>,
 876        _: Arc<Client>,
 877        mut cx: AsyncAppContext,
 878    ) -> Result<()> {
 879        this.update(&mut cx, |this, _| {
 880            this.update_channels_tx
 881                .unbounded_send(message.payload)
 882                .unwrap();
 883        })?;
 884        Ok(())
 885    }
 886
 887    async fn handle_update_user_channels(
 888        this: Model<Self>,
 889        message: TypedEnvelope<proto::UpdateUserChannels>,
 890        _: Arc<Client>,
 891        mut cx: AsyncAppContext,
 892    ) -> Result<()> {
 893        this.update(&mut cx, |this, cx| {
 894            for buffer_version in message.payload.observed_channel_buffer_version {
 895                let version = language::proto::deserialize_version(&buffer_version.version);
 896                this.acknowledge_notes_version(
 897                    ChannelId(buffer_version.channel_id),
 898                    buffer_version.epoch,
 899                    &version,
 900                    cx,
 901                );
 902            }
 903            for message_id in message.payload.observed_channel_message_id {
 904                this.acknowledge_message_id(
 905                    ChannelId(message_id.channel_id),
 906                    message_id.message_id,
 907                    cx,
 908                );
 909            }
 910            for membership in message.payload.channel_memberships {
 911                if let Some(role) = ChannelRole::from_i32(membership.role) {
 912                    this.channel_states
 913                        .entry(ChannelId(membership.channel_id))
 914                        .or_insert_with(|| ChannelState::default())
 915                        .set_role(role)
 916                }
 917            }
 918        })
 919    }
 920
 921    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 922        self.channel_index.clear();
 923        self.channel_invitations.clear();
 924        self.channel_participants.clear();
 925        self.channel_index.clear();
 926        self.outgoing_invites.clear();
 927        self.disconnect_channel_buffers_task.take();
 928
 929        for chat in self.opened_chats.values() {
 930            if let OpenedModelHandle::Open(chat) = chat {
 931                if let Some(chat) = chat.upgrade() {
 932                    chat.update(cx, |chat, cx| {
 933                        chat.rejoin(cx);
 934                    });
 935                }
 936            }
 937        }
 938
 939        let mut buffer_versions = Vec::new();
 940        for buffer in self.opened_buffers.values() {
 941            if let OpenedModelHandle::Open(buffer) = buffer {
 942                if let Some(buffer) = buffer.upgrade() {
 943                    let channel_buffer = buffer.read(cx);
 944                    let buffer = channel_buffer.buffer().read(cx);
 945                    buffer_versions.push(proto::ChannelBufferVersion {
 946                        channel_id: channel_buffer.channel_id.0,
 947                        epoch: channel_buffer.epoch(),
 948                        version: language::proto::serialize_version(&buffer.version()),
 949                    });
 950                }
 951            }
 952        }
 953
 954        if buffer_versions.is_empty() {
 955            return Task::ready(Ok(()));
 956        }
 957
 958        let response = self.client.request(proto::RejoinChannelBuffers {
 959            buffers: buffer_versions,
 960        });
 961
 962        cx.spawn(|this, mut cx| async move {
 963            let mut response = response.await?;
 964
 965            this.update(&mut cx, |this, cx| {
 966                this.opened_buffers.retain(|_, buffer| match buffer {
 967                    OpenedModelHandle::Open(channel_buffer) => {
 968                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 969                            return false;
 970                        };
 971
 972                        channel_buffer.update(cx, |channel_buffer, cx| {
 973                            let channel_id = channel_buffer.channel_id;
 974                            if let Some(remote_buffer) = response
 975                                .buffers
 976                                .iter_mut()
 977                                .find(|buffer| buffer.channel_id == channel_id.0)
 978                            {
 979                                let channel_id = channel_buffer.channel_id;
 980                                let remote_version =
 981                                    language::proto::deserialize_version(&remote_buffer.version);
 982
 983                                channel_buffer.replace_collaborators(
 984                                    mem::take(&mut remote_buffer.collaborators),
 985                                    cx,
 986                                );
 987
 988                                let operations = channel_buffer
 989                                    .buffer()
 990                                    .update(cx, |buffer, cx| {
 991                                        let outgoing_operations =
 992                                            buffer.serialize_ops(Some(remote_version), cx);
 993                                        let incoming_operations =
 994                                            mem::take(&mut remote_buffer.operations)
 995                                                .into_iter()
 996                                                .map(language::proto::deserialize_operation)
 997                                                .collect::<Result<Vec<_>>>()?;
 998                                        buffer.apply_ops(incoming_operations, cx)?;
 999                                        anyhow::Ok(outgoing_operations)
1000                                    })
1001                                    .log_err();
1002
1003                                if let Some(operations) = operations {
1004                                    let client = this.client.clone();
1005                                    cx.background_executor()
1006                                        .spawn(async move {
1007                                            let operations = operations.await;
1008                                            for chunk in
1009                                                language::proto::split_operations(operations)
1010                                            {
1011                                                client
1012                                                    .send(proto::UpdateChannelBuffer {
1013                                                        channel_id: channel_id.0,
1014                                                        operations: chunk,
1015                                                    })
1016                                                    .ok();
1017                                            }
1018                                        })
1019                                        .detach();
1020                                    return true;
1021                                }
1022                            }
1023
1024                            channel_buffer.disconnect(cx);
1025                            false
1026                        })
1027                    }
1028                    OpenedModelHandle::Loading(_) => true,
1029                });
1030            })
1031            .ok();
1032            anyhow::Ok(())
1033        })
1034    }
1035
1036    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1037        cx.notify();
1038
1039        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1040            cx.spawn(move |this, mut cx| async move {
1041                if wait_for_reconnect {
1042                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1043                }
1044
1045                if let Some(this) = this.upgrade() {
1046                    this.update(&mut cx, |this, cx| {
1047                        for (_, buffer) in this.opened_buffers.drain() {
1048                            if let OpenedModelHandle::Open(buffer) = buffer {
1049                                if let Some(buffer) = buffer.upgrade() {
1050                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1051                                }
1052                            }
1053                        }
1054                    })
1055                    .ok();
1056                }
1057            })
1058        });
1059    }
1060
1061    pub(crate) fn update_channels(
1062        &mut self,
1063        payload: proto::UpdateChannels,
1064        cx: &mut ModelContext<ChannelStore>,
1065    ) -> Option<Task<Result<()>>> {
1066        if !payload.remove_channel_invitations.is_empty() {
1067            self.channel_invitations
1068                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1069        }
1070        for channel in payload.channel_invitations {
1071            match self
1072                .channel_invitations
1073                .binary_search_by_key(&channel.id, |c| c.id.0)
1074            {
1075                Ok(ix) => {
1076                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1077                }
1078                Err(ix) => self.channel_invitations.insert(
1079                    ix,
1080                    Arc::new(Channel {
1081                        id: ChannelId(channel.id),
1082                        visibility: channel.visibility(),
1083                        name: channel.name.into(),
1084                        parent_path: channel
1085                            .parent_path
1086                            .into_iter()
1087                            .map(|cid| ChannelId(cid))
1088                            .collect(),
1089                    }),
1090                ),
1091            }
1092        }
1093
1094        let channels_changed = !payload.channels.is_empty()
1095            || !payload.delete_channels.is_empty()
1096            || !payload.latest_channel_message_ids.is_empty()
1097            || !payload.latest_channel_buffer_versions.is_empty()
1098            || !payload.hosted_projects.is_empty()
1099            || !payload.deleted_hosted_projects.is_empty();
1100
1101        if channels_changed {
1102            if !payload.delete_channels.is_empty() {
1103                let delete_channels: Vec<ChannelId> = payload
1104                    .delete_channels
1105                    .into_iter()
1106                    .map(|cid| ChannelId(cid))
1107                    .collect();
1108                self.channel_index.delete_channels(&delete_channels);
1109                self.channel_participants
1110                    .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1111
1112                for channel_id in &delete_channels {
1113                    let channel_id = *channel_id;
1114                    if payload
1115                        .channels
1116                        .iter()
1117                        .any(|channel| channel.id == channel_id.0)
1118                    {
1119                        continue;
1120                    }
1121                    if let Some(OpenedModelHandle::Open(buffer)) =
1122                        self.opened_buffers.remove(&channel_id)
1123                    {
1124                        if let Some(buffer) = buffer.upgrade() {
1125                            buffer.update(cx, ChannelBuffer::disconnect);
1126                        }
1127                    }
1128                }
1129            }
1130
1131            let mut index = self.channel_index.bulk_insert();
1132            for channel in payload.channels {
1133                let id = ChannelId(channel.id);
1134                let channel_changed = index.insert(channel);
1135
1136                if channel_changed {
1137                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1138                        if let Some(buffer) = buffer.upgrade() {
1139                            buffer.update(cx, ChannelBuffer::channel_changed);
1140                        }
1141                    }
1142                }
1143            }
1144
1145            for latest_buffer_version in payload.latest_channel_buffer_versions {
1146                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1147                self.channel_states
1148                    .entry(ChannelId(latest_buffer_version.channel_id))
1149                    .or_default()
1150                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1151            }
1152
1153            for latest_channel_message in payload.latest_channel_message_ids {
1154                self.channel_states
1155                    .entry(ChannelId(latest_channel_message.channel_id))
1156                    .or_default()
1157                    .update_latest_message_id(latest_channel_message.message_id);
1158            }
1159
1160            for hosted_project in payload.hosted_projects {
1161                let hosted_project: HostedProject = hosted_project.into();
1162                if let Some(old_project) = self
1163                    .hosted_projects
1164                    .insert(hosted_project.id, hosted_project.clone())
1165                {
1166                    self.channel_states
1167                        .entry(old_project.channel_id)
1168                        .or_default()
1169                        .remove_hosted_project(old_project.id);
1170                }
1171                self.channel_states
1172                    .entry(hosted_project.channel_id)
1173                    .or_default()
1174                    .add_hosted_project(hosted_project.id);
1175            }
1176
1177            for hosted_project_id in payload.deleted_hosted_projects {
1178                let hosted_project_id = HostedProjectId(hosted_project_id);
1179
1180                if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1181                    self.channel_states
1182                        .entry(old_project.channel_id)
1183                        .or_default()
1184                        .remove_hosted_project(old_project.id);
1185                }
1186            }
1187        }
1188
1189        cx.notify();
1190        if payload.channel_participants.is_empty() {
1191            return None;
1192        }
1193
1194        let mut all_user_ids = Vec::new();
1195        let channel_participants = payload.channel_participants;
1196        for entry in &channel_participants {
1197            for user_id in entry.participant_user_ids.iter() {
1198                if let Err(ix) = all_user_ids.binary_search(user_id) {
1199                    all_user_ids.insert(ix, *user_id);
1200                }
1201            }
1202        }
1203
1204        let users = self
1205            .user_store
1206            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1207        Some(cx.spawn(|this, mut cx| async move {
1208            let users = users.await?;
1209
1210            this.update(&mut cx, |this, cx| {
1211                for entry in &channel_participants {
1212                    let mut participants: Vec<_> = entry
1213                        .participant_user_ids
1214                        .iter()
1215                        .filter_map(|user_id| {
1216                            users
1217                                .binary_search_by_key(&user_id, |user| &user.id)
1218                                .ok()
1219                                .map(|ix| users[ix].clone())
1220                        })
1221                        .collect();
1222
1223                    participants.sort_by_key(|u| u.id);
1224
1225                    this.channel_participants
1226                        .insert(ChannelId(entry.channel_id), participants);
1227                }
1228
1229                cx.notify();
1230            })
1231        }))
1232    }
1233}
1234
1235impl ChannelState {
1236    fn set_role(&mut self, role: ChannelRole) {
1237        self.role = Some(role);
1238    }
1239
1240    fn has_channel_buffer_changed(&self) -> bool {
1241        if let Some(latest_version) = &self.latest_notes_versions {
1242            if let Some(observed_version) = &self.observed_notes_versions {
1243                latest_version.epoch > observed_version.epoch
1244                    || (latest_version.epoch == observed_version.epoch
1245                        && latest_version
1246                            .version
1247                            .changed_since(&observed_version.version))
1248            } else {
1249                true
1250            }
1251        } else {
1252            false
1253        }
1254    }
1255
1256    fn has_new_messages(&self) -> bool {
1257        let latest_message_id = self.latest_chat_message;
1258        let observed_message_id = self.observed_chat_message;
1259
1260        latest_message_id.is_some_and(|latest_message_id| {
1261            latest_message_id > observed_message_id.unwrap_or_default()
1262        })
1263    }
1264
1265    fn last_acknowledged_message_id(&self) -> Option<u64> {
1266        self.observed_chat_message
1267    }
1268
1269    fn acknowledge_message_id(&mut self, message_id: u64) {
1270        let observed = self.observed_chat_message.get_or_insert(message_id);
1271        *observed = (*observed).max(message_id);
1272    }
1273
1274    fn update_latest_message_id(&mut self, message_id: u64) {
1275        self.latest_chat_message =
1276            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1277    }
1278
1279    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1280        if let Some(existing) = &mut self.observed_notes_versions {
1281            if existing.epoch == epoch {
1282                existing.version.join(version);
1283                return;
1284            }
1285        }
1286        self.observed_notes_versions = Some(NotesVersion {
1287            epoch,
1288            version: version.clone(),
1289        });
1290    }
1291
1292    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1293        if let Some(existing) = &mut self.latest_notes_versions {
1294            if existing.epoch == epoch {
1295                existing.version.join(version);
1296                return;
1297            }
1298        }
1299        self.latest_notes_versions = Some(NotesVersion {
1300            epoch,
1301            version: version.clone(),
1302        });
1303    }
1304
1305    fn add_hosted_project(&mut self, project_id: HostedProjectId) {
1306        self.projects.insert(project_id);
1307    }
1308
1309    fn remove_hosted_project(&mut self, project_id: HostedProjectId) {
1310        self.projects.remove(&project_id);
1311    }
1312}