channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{Client, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use release_channel::RELEASE_CHANNEL;
  15use rpc::{
  16    proto::{self, ChannelRole, ChannelVisibility},
  17    TypedEnvelope,
  18};
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{async_maybe, maybe, ResultExt};
  21
  22pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  23    let channel_store =
  24        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  25    cx.set_global(GlobalChannelStore(channel_store));
  26}
  27
  28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  29
  30pub type ChannelId = u64;
  31
  32#[derive(Debug, Clone, Default)]
  33struct NotesVersion {
  34    epoch: u64,
  35    version: clock::Global,
  36}
  37
  38pub struct ChannelStore {
  39    pub channel_index: ChannelIndex,
  40    channel_invitations: Vec<Arc<Channel>>,
  41    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  42    channel_states: HashMap<ChannelId, ChannelState>,
  43
  44    outgoing_invites: HashSet<(ChannelId, UserId)>,
  45    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  46    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  47    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  48    client: Arc<Client>,
  49    user_store: Model<UserStore>,
  50    _rpc_subscriptions: [Subscription; 2],
  51    _watch_connection_status: Task<Option<()>>,
  52    disconnect_channel_buffers_task: Option<Task<()>>,
  53    _update_channels: Task<()>,
  54}
  55
  56#[derive(Clone, Debug)]
  57pub struct Channel {
  58    pub id: ChannelId,
  59    pub name: SharedString,
  60    pub visibility: proto::ChannelVisibility,
  61    pub parent_path: Vec<u64>,
  62}
  63
  64#[derive(Default)]
  65pub struct ChannelState {
  66    latest_chat_message: Option<u64>,
  67    latest_notes_versions: Option<NotesVersion>,
  68    observed_chat_message: Option<u64>,
  69    observed_notes_versions: Option<NotesVersion>,
  70    role: Option<ChannelRole>,
  71}
  72
  73impl Channel {
  74    pub fn link(&self) -> String {
  75        RELEASE_CHANNEL.link_prefix().to_owned()
  76            + "channel/"
  77            + &Self::slug(&self.name)
  78            + "-"
  79            + &self.id.to_string()
  80    }
  81
  82    pub fn notes_link(&self, heading: Option<String>) -> String {
  83        self.link()
  84            + "/notes"
  85            + &heading
  86                .map(|h| format!("#{}", Self::slug(&h)))
  87                .unwrap_or_default()
  88    }
  89
  90    pub fn is_root_channel(&self) -> bool {
  91        self.parent_path.is_empty()
  92    }
  93
  94    pub fn root_id(&self) -> ChannelId {
  95        self.parent_path
  96            .first()
  97            .map(|id| *id as ChannelId)
  98            .unwrap_or(self.id)
  99    }
 100
 101    pub fn slug(str: &str) -> String {
 102        let slug: String = str
 103            .chars()
 104            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 105            .collect();
 106
 107        slug.trim_matches(|c| c == '-').to_string()
 108    }
 109}
 110
 111pub struct ChannelMembership {
 112    pub user: Arc<User>,
 113    pub kind: proto::channel_member::Kind,
 114    pub role: proto::ChannelRole,
 115}
 116impl ChannelMembership {
 117    pub fn sort_key(&self) -> MembershipSortKey {
 118        MembershipSortKey {
 119            role_order: match self.role {
 120                proto::ChannelRole::Admin => 0,
 121                proto::ChannelRole::Member => 1,
 122                proto::ChannelRole::Banned => 2,
 123                proto::ChannelRole::Guest => 3,
 124            },
 125            kind_order: match self.kind {
 126                proto::channel_member::Kind::Member => 0,
 127                proto::channel_member::Kind::Invitee => 1,
 128            },
 129            username_order: self.user.github_login.as_str(),
 130        }
 131    }
 132}
 133
 134#[derive(PartialOrd, Ord, PartialEq, Eq)]
 135pub struct MembershipSortKey<'a> {
 136    role_order: u8,
 137    kind_order: u8,
 138    username_order: &'a str,
 139}
 140
 141pub enum ChannelEvent {
 142    ChannelCreated(ChannelId),
 143    ChannelRenamed(ChannelId),
 144}
 145
 146impl EventEmitter<ChannelEvent> for ChannelStore {}
 147
 148enum OpenedModelHandle<E> {
 149    Open(WeakModel<E>),
 150    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 151}
 152
 153struct GlobalChannelStore(Model<ChannelStore>);
 154
 155impl Global for GlobalChannelStore {}
 156
 157impl ChannelStore {
 158    pub fn global(cx: &AppContext) -> Model<Self> {
 159        cx.global::<GlobalChannelStore>().0.clone()
 160    }
 161
 162    pub fn new(
 163        client: Arc<Client>,
 164        user_store: Model<UserStore>,
 165        cx: &mut ModelContext<Self>,
 166    ) -> Self {
 167        let rpc_subscriptions = [
 168            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 169            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 170        ];
 171
 172        let mut connection_status = client.status();
 173        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 174        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 175            while let Some(status) = connection_status.next().await {
 176                let this = this.upgrade()?;
 177                match status {
 178                    client::Status::Connected { .. } => {
 179                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 180                            .ok()?
 181                            .await
 182                            .log_err()?;
 183                    }
 184                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 185                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 186                            .ok();
 187                    }
 188                    _ => {
 189                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 190                            .ok();
 191                    }
 192                }
 193            }
 194            Some(())
 195        });
 196
 197        Self {
 198            channel_invitations: Vec::default(),
 199            channel_index: ChannelIndex::default(),
 200            channel_participants: Default::default(),
 201            outgoing_invites: Default::default(),
 202            opened_buffers: Default::default(),
 203            opened_chats: Default::default(),
 204            update_channels_tx,
 205            client,
 206            user_store,
 207            _rpc_subscriptions: rpc_subscriptions,
 208            _watch_connection_status: watch_connection_status,
 209            disconnect_channel_buffers_task: None,
 210            _update_channels: cx.spawn(|this, mut cx| async move {
 211                async_maybe!({
 212                    while let Some(update_channels) = update_channels_rx.next().await {
 213                        if let Some(this) = this.upgrade() {
 214                            let update_task = this.update(&mut cx, |this, cx| {
 215                                this.update_channels(update_channels, cx)
 216                            })?;
 217                            if let Some(update_task) = update_task {
 218                                update_task.await.log_err();
 219                            }
 220                        }
 221                    }
 222                    anyhow::Ok(())
 223                })
 224                .await
 225                .log_err();
 226            }),
 227            channel_states: Default::default(),
 228        }
 229    }
 230
 231    pub fn client(&self) -> Arc<Client> {
 232        self.client.clone()
 233    }
 234
 235    /// Returns the number of unique channels in the store
 236    pub fn channel_count(&self) -> usize {
 237        self.channel_index.by_id().len()
 238    }
 239
 240    /// Returns the index of a channel ID in the list of unique channels
 241    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 242        self.channel_index
 243            .by_id()
 244            .keys()
 245            .position(|id| *id == channel_id)
 246    }
 247
 248    /// Returns an iterator over all unique channels
 249    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 250        self.channel_index.by_id().values()
 251    }
 252
 253    /// Iterate over all entries in the channel DAG
 254    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 255        self.channel_index
 256            .ordered_channels()
 257            .iter()
 258            .filter_map(move |id| {
 259                let channel = self.channel_index.by_id().get(id)?;
 260                Some((channel.parent_path.len(), channel))
 261            })
 262    }
 263
 264    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 265        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 266        self.channel_index.by_id().get(channel_id)
 267    }
 268
 269    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 270        self.channel_index.by_id().values().nth(ix)
 271    }
 272
 273    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 274        self.channel_invitations
 275            .iter()
 276            .any(|channel| channel.id == channel_id)
 277    }
 278
 279    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 280        &self.channel_invitations
 281    }
 282
 283    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 284        self.channel_index.by_id().get(&channel_id)
 285    }
 286
 287    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 288        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 289            if let OpenedModelHandle::Open(buffer) = buffer {
 290                return buffer.upgrade().is_some();
 291            }
 292        }
 293        false
 294    }
 295
 296    pub fn open_channel_buffer(
 297        &mut self,
 298        channel_id: ChannelId,
 299        cx: &mut ModelContext<Self>,
 300    ) -> Task<Result<Model<ChannelBuffer>>> {
 301        let client = self.client.clone();
 302        let user_store = self.user_store.clone();
 303        let channel_store = cx.handle();
 304        self.open_channel_resource(
 305            channel_id,
 306            |this| &mut this.opened_buffers,
 307            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 308            cx,
 309        )
 310    }
 311
 312    pub fn fetch_channel_messages(
 313        &self,
 314        message_ids: Vec<u64>,
 315        cx: &mut ModelContext<Self>,
 316    ) -> Task<Result<Vec<ChannelMessage>>> {
 317        let request = if message_ids.is_empty() {
 318            None
 319        } else {
 320            Some(
 321                self.client
 322                    .request(proto::GetChannelMessagesById { message_ids }),
 323            )
 324        };
 325        cx.spawn(|this, mut cx| async move {
 326            if let Some(request) = request {
 327                let response = request.await?;
 328                let this = this
 329                    .upgrade()
 330                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 331                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 332                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 333            } else {
 334                Ok(Vec::new())
 335            }
 336        })
 337    }
 338
 339    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 340        self.channel_states
 341            .get(&channel_id)
 342            .is_some_and(|state| state.has_channel_buffer_changed())
 343    }
 344
 345    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 346        self.channel_states
 347            .get(&channel_id)
 348            .is_some_and(|state| state.has_new_messages())
 349    }
 350
 351    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 352        self.channel_states.get(&channel_id).and_then(|state| {
 353            if let Some(last_message_id) = state.latest_chat_message {
 354                if state
 355                    .last_acknowledged_message_id()
 356                    .is_some_and(|id| id < last_message_id)
 357                {
 358                    return state.last_acknowledged_message_id();
 359                }
 360            }
 361
 362            None
 363        })
 364    }
 365
 366    pub fn acknowledge_message_id(
 367        &mut self,
 368        channel_id: ChannelId,
 369        message_id: u64,
 370        cx: &mut ModelContext<Self>,
 371    ) {
 372        self.channel_states
 373            .entry(channel_id)
 374            .or_insert_with(|| Default::default())
 375            .acknowledge_message_id(message_id);
 376        cx.notify();
 377    }
 378
 379    pub fn update_latest_message_id(
 380        &mut self,
 381        channel_id: ChannelId,
 382        message_id: u64,
 383        cx: &mut ModelContext<Self>,
 384    ) {
 385        self.channel_states
 386            .entry(channel_id)
 387            .or_insert_with(|| Default::default())
 388            .update_latest_message_id(message_id);
 389        cx.notify();
 390    }
 391
 392    pub fn acknowledge_notes_version(
 393        &mut self,
 394        channel_id: ChannelId,
 395        epoch: u64,
 396        version: &clock::Global,
 397        cx: &mut ModelContext<Self>,
 398    ) {
 399        self.channel_states
 400            .entry(channel_id)
 401            .or_insert_with(|| Default::default())
 402            .acknowledge_notes_version(epoch, version);
 403        cx.notify()
 404    }
 405
 406    pub fn update_latest_notes_version(
 407        &mut self,
 408        channel_id: ChannelId,
 409        epoch: u64,
 410        version: &clock::Global,
 411        cx: &mut ModelContext<Self>,
 412    ) {
 413        self.channel_states
 414            .entry(channel_id)
 415            .or_insert_with(|| Default::default())
 416            .update_latest_notes_version(epoch, version);
 417        cx.notify()
 418    }
 419
 420    pub fn open_channel_chat(
 421        &mut self,
 422        channel_id: ChannelId,
 423        cx: &mut ModelContext<Self>,
 424    ) -> Task<Result<Model<ChannelChat>>> {
 425        let client = self.client.clone();
 426        let user_store = self.user_store.clone();
 427        let this = cx.handle();
 428        self.open_channel_resource(
 429            channel_id,
 430            |this| &mut this.opened_chats,
 431            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 432            cx,
 433        )
 434    }
 435
 436    /// Asynchronously open a given resource associated with a channel.
 437    ///
 438    /// Make sure that the resource is only opened once, even if this method
 439    /// is called multiple times with the same channel id while the first task
 440    /// is still running.
 441    fn open_channel_resource<T, F, Fut>(
 442        &mut self,
 443        channel_id: ChannelId,
 444        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 445        load: F,
 446        cx: &mut ModelContext<Self>,
 447    ) -> Task<Result<Model<T>>>
 448    where
 449        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 450        Fut: Future<Output = Result<Model<T>>>,
 451        T: 'static,
 452    {
 453        let task = loop {
 454            match get_map(self).entry(channel_id) {
 455                hash_map::Entry::Occupied(e) => match e.get() {
 456                    OpenedModelHandle::Open(model) => {
 457                        if let Some(model) = model.upgrade() {
 458                            break Task::ready(Ok(model)).shared();
 459                        } else {
 460                            get_map(self).remove(&channel_id);
 461                            continue;
 462                        }
 463                    }
 464                    OpenedModelHandle::Loading(task) => {
 465                        break task.clone();
 466                    }
 467                },
 468                hash_map::Entry::Vacant(e) => {
 469                    let task = cx
 470                        .spawn(move |this, mut cx| async move {
 471                            let channel = this.update(&mut cx, |this, _| {
 472                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 473                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 474                                })
 475                            })??;
 476
 477                            load(channel, cx).await.map_err(Arc::new)
 478                        })
 479                        .shared();
 480
 481                    e.insert(OpenedModelHandle::Loading(task.clone()));
 482                    cx.spawn({
 483                        let task = task.clone();
 484                        move |this, mut cx| async move {
 485                            let result = task.await;
 486                            this.update(&mut cx, |this, _| match result {
 487                                Ok(model) => {
 488                                    get_map(this).insert(
 489                                        channel_id,
 490                                        OpenedModelHandle::Open(model.downgrade()),
 491                                    );
 492                                }
 493                                Err(_) => {
 494                                    get_map(this).remove(&channel_id);
 495                                }
 496                            })
 497                            .ok();
 498                        }
 499                    })
 500                    .detach();
 501                    break task;
 502                }
 503            }
 504        };
 505        cx.background_executor()
 506            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 507    }
 508
 509    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 510        self.channel_role(channel_id) == proto::ChannelRole::Admin
 511    }
 512
 513    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 514        self.channel_index
 515            .by_id()
 516            .get(&channel_id)
 517            .map_or(false, |channel| channel.is_root_channel())
 518    }
 519
 520    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 521        self.channel_index
 522            .by_id()
 523            .get(&channel_id)
 524            .map_or(false, |channel| {
 525                channel.visibility == ChannelVisibility::Public
 526            })
 527    }
 528
 529    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 530        match self.channel_role(channel_id) {
 531            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 532            _ => Capability::ReadOnly,
 533        }
 534    }
 535
 536    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 537        maybe!({
 538            let mut channel = self.channel_for_id(channel_id)?;
 539            if !channel.is_root_channel() {
 540                channel = self.channel_for_id(channel.root_id())?;
 541            }
 542            let root_channel_state = self.channel_states.get(&channel.id);
 543            root_channel_state?.role
 544        })
 545        .unwrap_or(proto::ChannelRole::Guest)
 546    }
 547
 548    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 549        self.channel_participants
 550            .get(&channel_id)
 551            .map_or(&[], |v| v.as_slice())
 552    }
 553
 554    pub fn create_channel(
 555        &self,
 556        name: &str,
 557        parent_id: Option<ChannelId>,
 558        cx: &mut ModelContext<Self>,
 559    ) -> Task<Result<ChannelId>> {
 560        let client = self.client.clone();
 561        let name = name.trim_start_matches("#").to_owned();
 562        cx.spawn(move |this, mut cx| async move {
 563            let response = client
 564                .request(proto::CreateChannel { name, parent_id })
 565                .await?;
 566
 567            let channel = response
 568                .channel
 569                .ok_or_else(|| anyhow!("missing channel in response"))?;
 570            let channel_id = channel.id;
 571
 572            this.update(&mut cx, |this, cx| {
 573                let task = this.update_channels(
 574                    proto::UpdateChannels {
 575                        channels: vec![channel],
 576                        ..Default::default()
 577                    },
 578                    cx,
 579                );
 580                assert!(task.is_none());
 581
 582                // This event is emitted because the collab panel wants to clear the pending edit state
 583                // before this frame is rendered. But we can't guarantee that the collab panel's future
 584                // will resolve before this flush_effects finishes. Synchronously emitting this event
 585                // ensures that the collab panel will observe this creation before the frame completes
 586                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 587            })?;
 588
 589            Ok(channel_id)
 590        })
 591    }
 592
 593    pub fn move_channel(
 594        &mut self,
 595        channel_id: ChannelId,
 596        to: ChannelId,
 597        cx: &mut ModelContext<Self>,
 598    ) -> Task<Result<()>> {
 599        let client = self.client.clone();
 600        cx.spawn(move |_, _| async move {
 601            let _ = client
 602                .request(proto::MoveChannel { channel_id, to })
 603                .await?;
 604
 605            Ok(())
 606        })
 607    }
 608
 609    pub fn set_channel_visibility(
 610        &mut self,
 611        channel_id: ChannelId,
 612        visibility: ChannelVisibility,
 613        cx: &mut ModelContext<Self>,
 614    ) -> Task<Result<()>> {
 615        let client = self.client.clone();
 616        cx.spawn(move |_, _| async move {
 617            let _ = client
 618                .request(proto::SetChannelVisibility {
 619                    channel_id,
 620                    visibility: visibility.into(),
 621                })
 622                .await?;
 623
 624            Ok(())
 625        })
 626    }
 627
 628    pub fn invite_member(
 629        &mut self,
 630        channel_id: ChannelId,
 631        user_id: UserId,
 632        role: proto::ChannelRole,
 633        cx: &mut ModelContext<Self>,
 634    ) -> Task<Result<()>> {
 635        if !self.outgoing_invites.insert((channel_id, user_id)) {
 636            return Task::ready(Err(anyhow!("invite request already in progress")));
 637        }
 638
 639        cx.notify();
 640        let client = self.client.clone();
 641        cx.spawn(move |this, mut cx| async move {
 642            let result = client
 643                .request(proto::InviteChannelMember {
 644                    channel_id,
 645                    user_id,
 646                    role: role.into(),
 647                })
 648                .await;
 649
 650            this.update(&mut cx, |this, cx| {
 651                this.outgoing_invites.remove(&(channel_id, user_id));
 652                cx.notify();
 653            })?;
 654
 655            result?;
 656
 657            Ok(())
 658        })
 659    }
 660
 661    pub fn remove_member(
 662        &mut self,
 663        channel_id: ChannelId,
 664        user_id: u64,
 665        cx: &mut ModelContext<Self>,
 666    ) -> Task<Result<()>> {
 667        if !self.outgoing_invites.insert((channel_id, user_id)) {
 668            return Task::ready(Err(anyhow!("invite request already in progress")));
 669        }
 670
 671        cx.notify();
 672        let client = self.client.clone();
 673        cx.spawn(move |this, mut cx| async move {
 674            let result = client
 675                .request(proto::RemoveChannelMember {
 676                    channel_id,
 677                    user_id,
 678                })
 679                .await;
 680
 681            this.update(&mut cx, |this, cx| {
 682                this.outgoing_invites.remove(&(channel_id, user_id));
 683                cx.notify();
 684            })?;
 685            result?;
 686            Ok(())
 687        })
 688    }
 689
 690    pub fn set_member_role(
 691        &mut self,
 692        channel_id: ChannelId,
 693        user_id: UserId,
 694        role: proto::ChannelRole,
 695        cx: &mut ModelContext<Self>,
 696    ) -> Task<Result<()>> {
 697        if !self.outgoing_invites.insert((channel_id, user_id)) {
 698            return Task::ready(Err(anyhow!("member request already in progress")));
 699        }
 700
 701        cx.notify();
 702        let client = self.client.clone();
 703        cx.spawn(move |this, mut cx| async move {
 704            let result = client
 705                .request(proto::SetChannelMemberRole {
 706                    channel_id,
 707                    user_id,
 708                    role: role.into(),
 709                })
 710                .await;
 711
 712            this.update(&mut cx, |this, cx| {
 713                this.outgoing_invites.remove(&(channel_id, user_id));
 714                cx.notify();
 715            })?;
 716
 717            result?;
 718            Ok(())
 719        })
 720    }
 721
 722    pub fn rename(
 723        &mut self,
 724        channel_id: ChannelId,
 725        new_name: &str,
 726        cx: &mut ModelContext<Self>,
 727    ) -> Task<Result<()>> {
 728        let client = self.client.clone();
 729        let name = new_name.to_string();
 730        cx.spawn(move |this, mut cx| async move {
 731            let channel = client
 732                .request(proto::RenameChannel { channel_id, name })
 733                .await?
 734                .channel
 735                .ok_or_else(|| anyhow!("missing channel in response"))?;
 736            this.update(&mut cx, |this, cx| {
 737                let task = this.update_channels(
 738                    proto::UpdateChannels {
 739                        channels: vec![channel],
 740                        ..Default::default()
 741                    },
 742                    cx,
 743                );
 744                assert!(task.is_none());
 745
 746                // This event is emitted because the collab panel wants to clear the pending edit state
 747                // before this frame is rendered. But we can't guarantee that the collab panel's future
 748                // will resolve before this flush_effects finishes. Synchronously emitting this event
 749                // ensures that the collab panel will observe this creation before the frame complete
 750                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 751            })?;
 752            Ok(())
 753        })
 754    }
 755
 756    pub fn respond_to_channel_invite(
 757        &mut self,
 758        channel_id: ChannelId,
 759        accept: bool,
 760        cx: &mut ModelContext<Self>,
 761    ) -> Task<Result<()>> {
 762        let client = self.client.clone();
 763        cx.background_executor().spawn(async move {
 764            client
 765                .request(proto::RespondToChannelInvite { channel_id, accept })
 766                .await?;
 767            Ok(())
 768        })
 769    }
 770
 771    pub fn get_channel_member_details(
 772        &self,
 773        channel_id: ChannelId,
 774        cx: &mut ModelContext<Self>,
 775    ) -> Task<Result<Vec<ChannelMembership>>> {
 776        let client = self.client.clone();
 777        let user_store = self.user_store.downgrade();
 778        cx.spawn(move |_, mut cx| async move {
 779            let response = client
 780                .request(proto::GetChannelMembers { channel_id })
 781                .await?;
 782
 783            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 784            let user_store = user_store
 785                .upgrade()
 786                .ok_or_else(|| anyhow!("user store dropped"))?;
 787            let users = user_store
 788                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 789                .await?;
 790
 791            Ok(users
 792                .into_iter()
 793                .zip(response.members)
 794                .filter_map(|(user, member)| {
 795                    Some(ChannelMembership {
 796                        user,
 797                        role: member.role(),
 798                        kind: member.kind(),
 799                    })
 800                })
 801                .collect())
 802        })
 803    }
 804
 805    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 806        let client = self.client.clone();
 807        async move {
 808            client.request(proto::DeleteChannel { channel_id }).await?;
 809            Ok(())
 810        }
 811    }
 812
 813    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 814        false
 815    }
 816
 817    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 818        self.outgoing_invites.contains(&(channel_id, user_id))
 819    }
 820
 821    async fn handle_update_channels(
 822        this: Model<Self>,
 823        message: TypedEnvelope<proto::UpdateChannels>,
 824        _: Arc<Client>,
 825        mut cx: AsyncAppContext,
 826    ) -> Result<()> {
 827        this.update(&mut cx, |this, _| {
 828            this.update_channels_tx
 829                .unbounded_send(message.payload)
 830                .unwrap();
 831        })?;
 832        Ok(())
 833    }
 834
 835    async fn handle_update_user_channels(
 836        this: Model<Self>,
 837        message: TypedEnvelope<proto::UpdateUserChannels>,
 838        _: Arc<Client>,
 839        mut cx: AsyncAppContext,
 840    ) -> Result<()> {
 841        this.update(&mut cx, |this, cx| {
 842            for buffer_version in message.payload.observed_channel_buffer_version {
 843                let version = language::proto::deserialize_version(&buffer_version.version);
 844                this.acknowledge_notes_version(
 845                    buffer_version.channel_id,
 846                    buffer_version.epoch,
 847                    &version,
 848                    cx,
 849                );
 850            }
 851            for message_id in message.payload.observed_channel_message_id {
 852                this.acknowledge_message_id(message_id.channel_id, message_id.message_id, cx);
 853            }
 854            for membership in message.payload.channel_memberships {
 855                if let Some(role) = ChannelRole::from_i32(membership.role) {
 856                    this.channel_states
 857                        .entry(membership.channel_id)
 858                        .or_insert_with(|| ChannelState::default())
 859                        .set_role(role)
 860                }
 861            }
 862        })
 863    }
 864
 865    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 866        self.channel_index.clear();
 867        self.channel_invitations.clear();
 868        self.channel_participants.clear();
 869        self.channel_index.clear();
 870        self.outgoing_invites.clear();
 871        self.disconnect_channel_buffers_task.take();
 872
 873        for chat in self.opened_chats.values() {
 874            if let OpenedModelHandle::Open(chat) = chat {
 875                if let Some(chat) = chat.upgrade() {
 876                    chat.update(cx, |chat, cx| {
 877                        chat.rejoin(cx);
 878                    });
 879                }
 880            }
 881        }
 882
 883        let mut buffer_versions = Vec::new();
 884        for buffer in self.opened_buffers.values() {
 885            if let OpenedModelHandle::Open(buffer) = buffer {
 886                if let Some(buffer) = buffer.upgrade() {
 887                    let channel_buffer = buffer.read(cx);
 888                    let buffer = channel_buffer.buffer().read(cx);
 889                    buffer_versions.push(proto::ChannelBufferVersion {
 890                        channel_id: channel_buffer.channel_id,
 891                        epoch: channel_buffer.epoch(),
 892                        version: language::proto::serialize_version(&buffer.version()),
 893                    });
 894                }
 895            }
 896        }
 897
 898        if buffer_versions.is_empty() {
 899            return Task::ready(Ok(()));
 900        }
 901
 902        let response = self.client.request(proto::RejoinChannelBuffers {
 903            buffers: buffer_versions,
 904        });
 905
 906        cx.spawn(|this, mut cx| async move {
 907            let mut response = response.await?;
 908
 909            this.update(&mut cx, |this, cx| {
 910                this.opened_buffers.retain(|_, buffer| match buffer {
 911                    OpenedModelHandle::Open(channel_buffer) => {
 912                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 913                            return false;
 914                        };
 915
 916                        channel_buffer.update(cx, |channel_buffer, cx| {
 917                            let channel_id = channel_buffer.channel_id;
 918                            if let Some(remote_buffer) = response
 919                                .buffers
 920                                .iter_mut()
 921                                .find(|buffer| buffer.channel_id == channel_id)
 922                            {
 923                                let channel_id = channel_buffer.channel_id;
 924                                let remote_version =
 925                                    language::proto::deserialize_version(&remote_buffer.version);
 926
 927                                channel_buffer.replace_collaborators(
 928                                    mem::take(&mut remote_buffer.collaborators),
 929                                    cx,
 930                                );
 931
 932                                let operations = channel_buffer
 933                                    .buffer()
 934                                    .update(cx, |buffer, cx| {
 935                                        let outgoing_operations =
 936                                            buffer.serialize_ops(Some(remote_version), cx);
 937                                        let incoming_operations =
 938                                            mem::take(&mut remote_buffer.operations)
 939                                                .into_iter()
 940                                                .map(language::proto::deserialize_operation)
 941                                                .collect::<Result<Vec<_>>>()?;
 942                                        buffer.apply_ops(incoming_operations, cx)?;
 943                                        anyhow::Ok(outgoing_operations)
 944                                    })
 945                                    .log_err();
 946
 947                                if let Some(operations) = operations {
 948                                    let client = this.client.clone();
 949                                    cx.background_executor()
 950                                        .spawn(async move {
 951                                            let operations = operations.await;
 952                                            for chunk in
 953                                                language::proto::split_operations(operations)
 954                                            {
 955                                                client
 956                                                    .send(proto::UpdateChannelBuffer {
 957                                                        channel_id,
 958                                                        operations: chunk,
 959                                                    })
 960                                                    .ok();
 961                                            }
 962                                        })
 963                                        .detach();
 964                                    return true;
 965                                }
 966                            }
 967
 968                            channel_buffer.disconnect(cx);
 969                            false
 970                        })
 971                    }
 972                    OpenedModelHandle::Loading(_) => true,
 973                });
 974            })
 975            .ok();
 976            anyhow::Ok(())
 977        })
 978    }
 979
 980    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
 981        cx.notify();
 982
 983        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
 984            cx.spawn(move |this, mut cx| async move {
 985                if wait_for_reconnect {
 986                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
 987                }
 988
 989                if let Some(this) = this.upgrade() {
 990                    this.update(&mut cx, |this, cx| {
 991                        for (_, buffer) in this.opened_buffers.drain() {
 992                            if let OpenedModelHandle::Open(buffer) = buffer {
 993                                if let Some(buffer) = buffer.upgrade() {
 994                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
 995                                }
 996                            }
 997                        }
 998                    })
 999                    .ok();
1000                }
1001            })
1002        });
1003    }
1004
1005    pub(crate) fn update_channels(
1006        &mut self,
1007        payload: proto::UpdateChannels,
1008        cx: &mut ModelContext<ChannelStore>,
1009    ) -> Option<Task<Result<()>>> {
1010        if !payload.remove_channel_invitations.is_empty() {
1011            self.channel_invitations
1012                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
1013        }
1014        for channel in payload.channel_invitations {
1015            match self
1016                .channel_invitations
1017                .binary_search_by_key(&channel.id, |c| c.id)
1018            {
1019                Ok(ix) => {
1020                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1021                }
1022                Err(ix) => self.channel_invitations.insert(
1023                    ix,
1024                    Arc::new(Channel {
1025                        id: channel.id,
1026                        visibility: channel.visibility(),
1027                        name: channel.name.into(),
1028                        parent_path: channel.parent_path,
1029                    }),
1030                ),
1031            }
1032        }
1033
1034        let channels_changed = !payload.channels.is_empty()
1035            || !payload.delete_channels.is_empty()
1036            || !payload.latest_channel_message_ids.is_empty()
1037            || !payload.latest_channel_buffer_versions.is_empty();
1038
1039        if channels_changed {
1040            if !payload.delete_channels.is_empty() {
1041                self.channel_index.delete_channels(&payload.delete_channels);
1042                self.channel_participants
1043                    .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
1044
1045                for channel_id in &payload.delete_channels {
1046                    let channel_id = *channel_id;
1047                    if payload
1048                        .channels
1049                        .iter()
1050                        .any(|channel| channel.id == channel_id)
1051                    {
1052                        continue;
1053                    }
1054                    if let Some(OpenedModelHandle::Open(buffer)) =
1055                        self.opened_buffers.remove(&channel_id)
1056                    {
1057                        if let Some(buffer) = buffer.upgrade() {
1058                            buffer.update(cx, ChannelBuffer::disconnect);
1059                        }
1060                    }
1061                }
1062            }
1063
1064            let mut index = self.channel_index.bulk_insert();
1065            for channel in payload.channels {
1066                let id = channel.id;
1067                let channel_changed = index.insert(channel);
1068
1069                if channel_changed {
1070                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1071                        if let Some(buffer) = buffer.upgrade() {
1072                            buffer.update(cx, ChannelBuffer::channel_changed);
1073                        }
1074                    }
1075                }
1076            }
1077
1078            for latest_buffer_version in payload.latest_channel_buffer_versions {
1079                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1080                self.channel_states
1081                    .entry(latest_buffer_version.channel_id)
1082                    .or_default()
1083                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1084            }
1085
1086            for latest_channel_message in payload.latest_channel_message_ids {
1087                self.channel_states
1088                    .entry(latest_channel_message.channel_id)
1089                    .or_default()
1090                    .update_latest_message_id(latest_channel_message.message_id);
1091            }
1092        }
1093
1094        cx.notify();
1095        if payload.channel_participants.is_empty() {
1096            return None;
1097        }
1098
1099        let mut all_user_ids = Vec::new();
1100        let channel_participants = payload.channel_participants;
1101        for entry in &channel_participants {
1102            for user_id in entry.participant_user_ids.iter() {
1103                if let Err(ix) = all_user_ids.binary_search(user_id) {
1104                    all_user_ids.insert(ix, *user_id);
1105                }
1106            }
1107        }
1108
1109        let users = self
1110            .user_store
1111            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1112        Some(cx.spawn(|this, mut cx| async move {
1113            let users = users.await?;
1114
1115            this.update(&mut cx, |this, cx| {
1116                for entry in &channel_participants {
1117                    let mut participants: Vec<_> = entry
1118                        .participant_user_ids
1119                        .iter()
1120                        .filter_map(|user_id| {
1121                            users
1122                                .binary_search_by_key(&user_id, |user| &user.id)
1123                                .ok()
1124                                .map(|ix| users[ix].clone())
1125                        })
1126                        .collect();
1127
1128                    participants.sort_by_key(|u| u.id);
1129
1130                    this.channel_participants
1131                        .insert(entry.channel_id, participants);
1132                }
1133
1134                cx.notify();
1135            })
1136        }))
1137    }
1138}
1139
1140impl ChannelState {
1141    fn set_role(&mut self, role: ChannelRole) {
1142        self.role = Some(role);
1143    }
1144
1145    fn has_channel_buffer_changed(&self) -> bool {
1146        if let Some(latest_version) = &self.latest_notes_versions {
1147            if let Some(observed_version) = &self.observed_notes_versions {
1148                latest_version.epoch > observed_version.epoch
1149                    || (latest_version.epoch == observed_version.epoch
1150                        && latest_version
1151                            .version
1152                            .changed_since(&observed_version.version))
1153            } else {
1154                true
1155            }
1156        } else {
1157            false
1158        }
1159    }
1160
1161    fn has_new_messages(&self) -> bool {
1162        let latest_message_id = self.latest_chat_message;
1163        let observed_message_id = self.observed_chat_message;
1164
1165        latest_message_id.is_some_and(|latest_message_id| {
1166            latest_message_id > observed_message_id.unwrap_or_default()
1167        })
1168    }
1169
1170    fn last_acknowledged_message_id(&self) -> Option<u64> {
1171        self.observed_chat_message
1172    }
1173
1174    fn acknowledge_message_id(&mut self, message_id: u64) {
1175        let observed = self.observed_chat_message.get_or_insert(message_id);
1176        *observed = (*observed).max(message_id);
1177    }
1178
1179    fn update_latest_message_id(&mut self, message_id: u64) {
1180        self.latest_chat_message =
1181            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1182    }
1183
1184    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1185        if let Some(existing) = &mut self.observed_notes_versions {
1186            if existing.epoch == epoch {
1187                existing.version.join(version);
1188                return;
1189            }
1190        }
1191        self.observed_notes_versions = Some(NotesVersion {
1192            epoch,
1193            version: version.clone(),
1194        });
1195    }
1196
1197    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1198        if let Some(existing) = &mut self.latest_notes_versions {
1199            if existing.epoch == epoch {
1200                existing.version.join(version);
1201                return;
1202            }
1203        }
1204        self.latest_notes_versions = Some(NotesVersion {
1205            epoch,
1206            version: version.clone(),
1207        });
1208    }
1209}