channel_store.rs

   1mod channel_index;
   2
   3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
   4use anyhow::{anyhow, Result};
   5use channel_index::ChannelIndex;
   6use client::{ChannelId, Client, ClientSettings, ProjectId, Subscription, User, UserId, UserStore};
   7use collections::{hash_map, HashMap, HashSet};
   8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
   9use gpui::{
  10    AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
  11    Task, WeakModel,
  12};
  13use language::Capability;
  14use rpc::{
  15    proto::{self, ChannelRole, ChannelVisibility},
  16    TypedEnvelope,
  17};
  18use settings::Settings;
  19use std::{mem, sync::Arc, time::Duration};
  20use util::{maybe, ResultExt};
  21
  22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
  23
  24pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
  25    let channel_store =
  26        cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
  27    cx.set_global(GlobalChannelStore(channel_store));
  28}
  29
  30#[derive(Debug, Clone, Default, PartialEq)]
  31struct NotesVersion {
  32    epoch: u64,
  33    version: clock::Global,
  34}
  35
  36#[derive(Debug, Clone)]
  37pub struct HostedProject {
  38    project_id: ProjectId,
  39    channel_id: ChannelId,
  40    name: SharedString,
  41    _visibility: proto::ChannelVisibility,
  42}
  43
  44impl From<proto::HostedProject> for HostedProject {
  45    fn from(project: proto::HostedProject) -> Self {
  46        Self {
  47            project_id: ProjectId(project.project_id),
  48            channel_id: ChannelId(project.channel_id),
  49            _visibility: project.visibility(),
  50            name: project.name.into(),
  51        }
  52    }
  53}
  54
  55pub struct ChannelStore {
  56    pub channel_index: ChannelIndex,
  57    channel_invitations: Vec<Arc<Channel>>,
  58    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
  59    channel_states: HashMap<ChannelId, ChannelState>,
  60    hosted_projects: HashMap<ProjectId, HostedProject>,
  61
  62    outgoing_invites: HashSet<(ChannelId, UserId)>,
  63    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
  64    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
  65    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
  66    client: Arc<Client>,
  67    user_store: Model<UserStore>,
  68    _rpc_subscriptions: [Subscription; 2],
  69    _watch_connection_status: Task<Option<()>>,
  70    disconnect_channel_buffers_task: Option<Task<()>>,
  71    _update_channels: Task<()>,
  72}
  73
  74#[derive(Clone, Debug)]
  75pub struct Channel {
  76    pub id: ChannelId,
  77    pub name: SharedString,
  78    pub visibility: proto::ChannelVisibility,
  79    pub parent_path: Vec<ChannelId>,
  80}
  81
  82#[derive(Default, Debug)]
  83pub struct ChannelState {
  84    latest_chat_message: Option<u64>,
  85    latest_notes_version: NotesVersion,
  86    observed_notes_version: NotesVersion,
  87    observed_chat_message: Option<u64>,
  88    role: Option<ChannelRole>,
  89    projects: HashSet<ProjectId>,
  90}
  91
  92impl Channel {
  93    pub fn link(&self, cx: &AppContext) -> String {
  94        format!(
  95            "{}/channel/{}-{}",
  96            ClientSettings::get_global(cx).server_url,
  97            Self::slug(&self.name),
  98            self.id
  99        )
 100    }
 101
 102    pub fn notes_link(&self, heading: Option<String>, cx: &AppContext) -> String {
 103        self.link(cx)
 104            + "/notes"
 105            + &heading
 106                .map(|h| format!("#{}", Self::slug(&h)))
 107                .unwrap_or_default()
 108    }
 109
 110    pub fn is_root_channel(&self) -> bool {
 111        self.parent_path.is_empty()
 112    }
 113
 114    pub fn root_id(&self) -> ChannelId {
 115        self.parent_path.first().copied().unwrap_or(self.id)
 116    }
 117
 118    pub fn slug(str: &str) -> String {
 119        let slug: String = str
 120            .chars()
 121            .map(|c| if c.is_alphanumeric() { c } else { '-' })
 122            .collect();
 123
 124        slug.trim_matches(|c| c == '-').to_string()
 125    }
 126}
 127
 128pub struct ChannelMembership {
 129    pub user: Arc<User>,
 130    pub kind: proto::channel_member::Kind,
 131    pub role: proto::ChannelRole,
 132}
 133impl ChannelMembership {
 134    pub fn sort_key(&self) -> MembershipSortKey {
 135        MembershipSortKey {
 136            role_order: match self.role {
 137                proto::ChannelRole::Admin => 0,
 138                proto::ChannelRole::Member => 1,
 139                proto::ChannelRole::Banned => 2,
 140                proto::ChannelRole::Talker => 3,
 141                proto::ChannelRole::Guest => 4,
 142            },
 143            kind_order: match self.kind {
 144                proto::channel_member::Kind::Member => 0,
 145                proto::channel_member::Kind::Invitee => 1,
 146            },
 147            username_order: self.user.github_login.as_str(),
 148        }
 149    }
 150}
 151
 152#[derive(PartialOrd, Ord, PartialEq, Eq)]
 153pub struct MembershipSortKey<'a> {
 154    role_order: u8,
 155    kind_order: u8,
 156    username_order: &'a str,
 157}
 158
 159pub enum ChannelEvent {
 160    ChannelCreated(ChannelId),
 161    ChannelRenamed(ChannelId),
 162}
 163
 164impl EventEmitter<ChannelEvent> for ChannelStore {}
 165
 166enum OpenedModelHandle<E> {
 167    Open(WeakModel<E>),
 168    Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
 169}
 170
 171struct GlobalChannelStore(Model<ChannelStore>);
 172
 173impl Global for GlobalChannelStore {}
 174
 175impl ChannelStore {
 176    pub fn global(cx: &AppContext) -> Model<Self> {
 177        cx.global::<GlobalChannelStore>().0.clone()
 178    }
 179
 180    pub fn new(
 181        client: Arc<Client>,
 182        user_store: Model<UserStore>,
 183        cx: &mut ModelContext<Self>,
 184    ) -> Self {
 185        let rpc_subscriptions = [
 186            client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
 187            client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
 188        ];
 189
 190        let mut connection_status = client.status();
 191        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 192        let watch_connection_status = cx.spawn(|this, mut cx| async move {
 193            while let Some(status) = connection_status.next().await {
 194                let this = this.upgrade()?;
 195                match status {
 196                    client::Status::Connected { .. } => {
 197                        this.update(&mut cx, |this, cx| this.handle_connect(cx))
 198                            .ok()?
 199                            .await
 200                            .log_err()?;
 201                    }
 202                    client::Status::SignedOut | client::Status::UpgradeRequired => {
 203                        this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
 204                            .ok();
 205                    }
 206                    _ => {
 207                        this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
 208                            .ok();
 209                    }
 210                }
 211            }
 212            Some(())
 213        });
 214
 215        Self {
 216            channel_invitations: Vec::default(),
 217            channel_index: ChannelIndex::default(),
 218            channel_participants: Default::default(),
 219            hosted_projects: Default::default(),
 220            outgoing_invites: Default::default(),
 221            opened_buffers: Default::default(),
 222            opened_chats: Default::default(),
 223            update_channels_tx,
 224            client,
 225            user_store,
 226            _rpc_subscriptions: rpc_subscriptions,
 227            _watch_connection_status: watch_connection_status,
 228            disconnect_channel_buffers_task: None,
 229            _update_channels: cx.spawn(|this, mut cx| async move {
 230                maybe!(async move {
 231                    while let Some(update_channels) = update_channels_rx.next().await {
 232                        if let Some(this) = this.upgrade() {
 233                            let update_task = this.update(&mut cx, |this, cx| {
 234                                this.update_channels(update_channels, cx)
 235                            })?;
 236                            if let Some(update_task) = update_task {
 237                                update_task.await.log_err();
 238                            }
 239                        }
 240                    }
 241                    anyhow::Ok(())
 242                })
 243                .await
 244                .log_err();
 245            }),
 246            channel_states: Default::default(),
 247        }
 248    }
 249
 250    pub fn client(&self) -> Arc<Client> {
 251        self.client.clone()
 252    }
 253
 254    /// Returns the number of unique channels in the store
 255    pub fn channel_count(&self) -> usize {
 256        self.channel_index.by_id().len()
 257    }
 258
 259    /// Returns the index of a channel ID in the list of unique channels
 260    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
 261        self.channel_index
 262            .by_id()
 263            .keys()
 264            .position(|id| *id == channel_id)
 265    }
 266
 267    /// Returns an iterator over all unique channels
 268    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
 269        self.channel_index.by_id().values()
 270    }
 271
 272    /// Iterate over all entries in the channel DAG
 273    pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
 274        self.channel_index
 275            .ordered_channels()
 276            .iter()
 277            .filter_map(move |id| {
 278                let channel = self.channel_index.by_id().get(id)?;
 279                Some((channel.parent_path.len(), channel))
 280            })
 281    }
 282
 283    pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
 284        let channel_id = self.channel_index.ordered_channels().get(ix)?;
 285        self.channel_index.by_id().get(channel_id)
 286    }
 287
 288    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
 289        self.channel_index.by_id().values().nth(ix)
 290    }
 291
 292    pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
 293        self.channel_invitations
 294            .iter()
 295            .any(|channel| channel.id == channel_id)
 296    }
 297
 298    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
 299        &self.channel_invitations
 300    }
 301
 302    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
 303        self.channel_index.by_id().get(&channel_id)
 304    }
 305
 306    pub fn projects_for_id(&self, channel_id: ChannelId) -> Vec<(SharedString, ProjectId)> {
 307        let mut projects: Vec<(SharedString, ProjectId)> = self
 308            .channel_states
 309            .get(&channel_id)
 310            .map(|state| state.projects.clone())
 311            .unwrap_or_default()
 312            .into_iter()
 313            .flat_map(|id| Some((self.hosted_projects.get(&id)?.name.clone(), id)))
 314            .collect();
 315        projects.sort();
 316        projects
 317    }
 318
 319    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
 320        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
 321            if let OpenedModelHandle::Open(buffer) = buffer {
 322                return buffer.upgrade().is_some();
 323            }
 324        }
 325        false
 326    }
 327
 328    pub fn open_channel_buffer(
 329        &mut self,
 330        channel_id: ChannelId,
 331        cx: &mut ModelContext<Self>,
 332    ) -> Task<Result<Model<ChannelBuffer>>> {
 333        let client = self.client.clone();
 334        let user_store = self.user_store.clone();
 335        let channel_store = cx.handle();
 336        self.open_channel_resource(
 337            channel_id,
 338            |this| &mut this.opened_buffers,
 339            |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
 340            cx,
 341        )
 342    }
 343
 344    pub fn fetch_channel_messages(
 345        &self,
 346        message_ids: Vec<u64>,
 347        cx: &mut ModelContext<Self>,
 348    ) -> Task<Result<Vec<ChannelMessage>>> {
 349        let request = if message_ids.is_empty() {
 350            None
 351        } else {
 352            Some(
 353                self.client
 354                    .request(proto::GetChannelMessagesById { message_ids }),
 355            )
 356        };
 357        cx.spawn(|this, mut cx| async move {
 358            if let Some(request) = request {
 359                let response = request.await?;
 360                let this = this
 361                    .upgrade()
 362                    .ok_or_else(|| anyhow!("channel store dropped"))?;
 363                let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
 364                ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
 365            } else {
 366                Ok(Vec::new())
 367            }
 368        })
 369    }
 370
 371    pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
 372        self.channel_states
 373            .get(&channel_id)
 374            .is_some_and(|state| state.has_channel_buffer_changed())
 375    }
 376
 377    pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
 378        self.channel_states
 379            .get(&channel_id)
 380            .is_some_and(|state| state.has_new_messages())
 381    }
 382
 383    pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
 384        self.channel_states.get(&channel_id).and_then(|state| {
 385            if let Some(last_message_id) = state.latest_chat_message {
 386                if state
 387                    .last_acknowledged_message_id()
 388                    .is_some_and(|id| id < last_message_id)
 389                {
 390                    return state.last_acknowledged_message_id();
 391                }
 392            }
 393
 394            None
 395        })
 396    }
 397
 398    pub fn acknowledge_message_id(
 399        &mut self,
 400        channel_id: ChannelId,
 401        message_id: u64,
 402        cx: &mut ModelContext<Self>,
 403    ) {
 404        self.channel_states
 405            .entry(channel_id)
 406            .or_insert_with(|| Default::default())
 407            .acknowledge_message_id(message_id);
 408        cx.notify();
 409    }
 410
 411    pub fn update_latest_message_id(
 412        &mut self,
 413        channel_id: ChannelId,
 414        message_id: u64,
 415        cx: &mut ModelContext<Self>,
 416    ) {
 417        self.channel_states
 418            .entry(channel_id)
 419            .or_insert_with(|| Default::default())
 420            .update_latest_message_id(message_id);
 421        cx.notify();
 422    }
 423
 424    pub fn acknowledge_notes_version(
 425        &mut self,
 426        channel_id: ChannelId,
 427        epoch: u64,
 428        version: &clock::Global,
 429        cx: &mut ModelContext<Self>,
 430    ) {
 431        self.channel_states
 432            .entry(channel_id)
 433            .or_insert_with(|| Default::default())
 434            .acknowledge_notes_version(epoch, version);
 435        cx.notify()
 436    }
 437
 438    pub fn update_latest_notes_version(
 439        &mut self,
 440        channel_id: ChannelId,
 441        epoch: u64,
 442        version: &clock::Global,
 443        cx: &mut ModelContext<Self>,
 444    ) {
 445        self.channel_states
 446            .entry(channel_id)
 447            .or_insert_with(|| Default::default())
 448            .update_latest_notes_version(epoch, version);
 449        cx.notify()
 450    }
 451
 452    pub fn open_channel_chat(
 453        &mut self,
 454        channel_id: ChannelId,
 455        cx: &mut ModelContext<Self>,
 456    ) -> Task<Result<Model<ChannelChat>>> {
 457        let client = self.client.clone();
 458        let user_store = self.user_store.clone();
 459        let this = cx.handle();
 460        self.open_channel_resource(
 461            channel_id,
 462            |this| &mut this.opened_chats,
 463            |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
 464            cx,
 465        )
 466    }
 467
 468    /// Asynchronously open a given resource associated with a channel.
 469    ///
 470    /// Make sure that the resource is only opened once, even if this method
 471    /// is called multiple times with the same channel id while the first task
 472    /// is still running.
 473    fn open_channel_resource<T, F, Fut>(
 474        &mut self,
 475        channel_id: ChannelId,
 476        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
 477        load: F,
 478        cx: &mut ModelContext<Self>,
 479    ) -> Task<Result<Model<T>>>
 480    where
 481        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
 482        Fut: Future<Output = Result<Model<T>>>,
 483        T: 'static,
 484    {
 485        let task = loop {
 486            match get_map(self).entry(channel_id) {
 487                hash_map::Entry::Occupied(e) => match e.get() {
 488                    OpenedModelHandle::Open(model) => {
 489                        if let Some(model) = model.upgrade() {
 490                            break Task::ready(Ok(model)).shared();
 491                        } else {
 492                            get_map(self).remove(&channel_id);
 493                            continue;
 494                        }
 495                    }
 496                    OpenedModelHandle::Loading(task) => {
 497                        break task.clone();
 498                    }
 499                },
 500                hash_map::Entry::Vacant(e) => {
 501                    let task = cx
 502                        .spawn(move |this, mut cx| async move {
 503                            let channel = this.update(&mut cx, |this, _| {
 504                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
 505                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
 506                                })
 507                            })??;
 508
 509                            load(channel, cx).await.map_err(Arc::new)
 510                        })
 511                        .shared();
 512
 513                    e.insert(OpenedModelHandle::Loading(task.clone()));
 514                    cx.spawn({
 515                        let task = task.clone();
 516                        move |this, mut cx| async move {
 517                            let result = task.await;
 518                            this.update(&mut cx, |this, _| match result {
 519                                Ok(model) => {
 520                                    get_map(this).insert(
 521                                        channel_id,
 522                                        OpenedModelHandle::Open(model.downgrade()),
 523                                    );
 524                                }
 525                                Err(_) => {
 526                                    get_map(this).remove(&channel_id);
 527                                }
 528                            })
 529                            .ok();
 530                        }
 531                    })
 532                    .detach();
 533                    break task;
 534                }
 535            }
 536        };
 537        cx.background_executor()
 538            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
 539    }
 540
 541    pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
 542        self.channel_role(channel_id) == proto::ChannelRole::Admin
 543    }
 544
 545    pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
 546        self.channel_index
 547            .by_id()
 548            .get(&channel_id)
 549            .map_or(false, |channel| channel.is_root_channel())
 550    }
 551
 552    pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
 553        self.channel_index
 554            .by_id()
 555            .get(&channel_id)
 556            .map_or(false, |channel| {
 557                channel.visibility == ChannelVisibility::Public
 558            })
 559    }
 560
 561    pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
 562        match self.channel_role(channel_id) {
 563            ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
 564            _ => Capability::ReadOnly,
 565        }
 566    }
 567
 568    pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
 569        maybe!({
 570            let mut channel = self.channel_for_id(channel_id)?;
 571            if !channel.is_root_channel() {
 572                channel = self.channel_for_id(channel.root_id())?;
 573            }
 574            let root_channel_state = self.channel_states.get(&channel.id);
 575            root_channel_state?.role
 576        })
 577        .unwrap_or(proto::ChannelRole::Guest)
 578    }
 579
 580    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
 581        self.channel_participants
 582            .get(&channel_id)
 583            .map_or(&[], |v| v.as_slice())
 584    }
 585
 586    pub fn create_channel(
 587        &self,
 588        name: &str,
 589        parent_id: Option<ChannelId>,
 590        cx: &mut ModelContext<Self>,
 591    ) -> Task<Result<ChannelId>> {
 592        let client = self.client.clone();
 593        let name = name.trim_start_matches('#').to_owned();
 594        cx.spawn(move |this, mut cx| async move {
 595            let response = client
 596                .request(proto::CreateChannel {
 597                    name,
 598                    parent_id: parent_id.map(|cid| cid.0),
 599                })
 600                .await?;
 601
 602            let channel = response
 603                .channel
 604                .ok_or_else(|| anyhow!("missing channel in response"))?;
 605            let channel_id = ChannelId(channel.id);
 606
 607            this.update(&mut cx, |this, cx| {
 608                let task = this.update_channels(
 609                    proto::UpdateChannels {
 610                        channels: vec![channel],
 611                        ..Default::default()
 612                    },
 613                    cx,
 614                );
 615                assert!(task.is_none());
 616
 617                // This event is emitted because the collab panel wants to clear the pending edit state
 618                // before this frame is rendered. But we can't guarantee that the collab panel's future
 619                // will resolve before this flush_effects finishes. Synchronously emitting this event
 620                // ensures that the collab panel will observe this creation before the frame completes
 621                cx.emit(ChannelEvent::ChannelCreated(channel_id));
 622            })?;
 623
 624            Ok(channel_id)
 625        })
 626    }
 627
 628    pub fn move_channel(
 629        &mut self,
 630        channel_id: ChannelId,
 631        to: ChannelId,
 632        cx: &mut ModelContext<Self>,
 633    ) -> Task<Result<()>> {
 634        let client = self.client.clone();
 635        cx.spawn(move |_, _| async move {
 636            let _ = client
 637                .request(proto::MoveChannel {
 638                    channel_id: channel_id.0,
 639                    to: to.0,
 640                })
 641                .await?;
 642
 643            Ok(())
 644        })
 645    }
 646
 647    pub fn set_channel_visibility(
 648        &mut self,
 649        channel_id: ChannelId,
 650        visibility: ChannelVisibility,
 651        cx: &mut ModelContext<Self>,
 652    ) -> Task<Result<()>> {
 653        let client = self.client.clone();
 654        cx.spawn(move |_, _| async move {
 655            let _ = client
 656                .request(proto::SetChannelVisibility {
 657                    channel_id: channel_id.0,
 658                    visibility: visibility.into(),
 659                })
 660                .await?;
 661
 662            Ok(())
 663        })
 664    }
 665
 666    pub fn invite_member(
 667        &mut self,
 668        channel_id: ChannelId,
 669        user_id: UserId,
 670        role: proto::ChannelRole,
 671        cx: &mut ModelContext<Self>,
 672    ) -> Task<Result<()>> {
 673        if !self.outgoing_invites.insert((channel_id, user_id)) {
 674            return Task::ready(Err(anyhow!("invite request already in progress")));
 675        }
 676
 677        cx.notify();
 678        let client = self.client.clone();
 679        cx.spawn(move |this, mut cx| async move {
 680            let result = client
 681                .request(proto::InviteChannelMember {
 682                    channel_id: channel_id.0,
 683                    user_id,
 684                    role: role.into(),
 685                })
 686                .await;
 687
 688            this.update(&mut cx, |this, cx| {
 689                this.outgoing_invites.remove(&(channel_id, user_id));
 690                cx.notify();
 691            })?;
 692
 693            result?;
 694
 695            Ok(())
 696        })
 697    }
 698
 699    pub fn remove_member(
 700        &mut self,
 701        channel_id: ChannelId,
 702        user_id: u64,
 703        cx: &mut ModelContext<Self>,
 704    ) -> Task<Result<()>> {
 705        if !self.outgoing_invites.insert((channel_id, user_id)) {
 706            return Task::ready(Err(anyhow!("invite request already in progress")));
 707        }
 708
 709        cx.notify();
 710        let client = self.client.clone();
 711        cx.spawn(move |this, mut cx| async move {
 712            let result = client
 713                .request(proto::RemoveChannelMember {
 714                    channel_id: channel_id.0,
 715                    user_id,
 716                })
 717                .await;
 718
 719            this.update(&mut cx, |this, cx| {
 720                this.outgoing_invites.remove(&(channel_id, user_id));
 721                cx.notify();
 722            })?;
 723            result?;
 724            Ok(())
 725        })
 726    }
 727
 728    pub fn set_member_role(
 729        &mut self,
 730        channel_id: ChannelId,
 731        user_id: UserId,
 732        role: proto::ChannelRole,
 733        cx: &mut ModelContext<Self>,
 734    ) -> Task<Result<()>> {
 735        if !self.outgoing_invites.insert((channel_id, user_id)) {
 736            return Task::ready(Err(anyhow!("member request already in progress")));
 737        }
 738
 739        cx.notify();
 740        let client = self.client.clone();
 741        cx.spawn(move |this, mut cx| async move {
 742            let result = client
 743                .request(proto::SetChannelMemberRole {
 744                    channel_id: channel_id.0,
 745                    user_id,
 746                    role: role.into(),
 747                })
 748                .await;
 749
 750            this.update(&mut cx, |this, cx| {
 751                this.outgoing_invites.remove(&(channel_id, user_id));
 752                cx.notify();
 753            })?;
 754
 755            result?;
 756            Ok(())
 757        })
 758    }
 759
 760    pub fn rename(
 761        &mut self,
 762        channel_id: ChannelId,
 763        new_name: &str,
 764        cx: &mut ModelContext<Self>,
 765    ) -> Task<Result<()>> {
 766        let client = self.client.clone();
 767        let name = new_name.to_string();
 768        cx.spawn(move |this, mut cx| async move {
 769            let channel = client
 770                .request(proto::RenameChannel {
 771                    channel_id: channel_id.0,
 772                    name,
 773                })
 774                .await?
 775                .channel
 776                .ok_or_else(|| anyhow!("missing channel in response"))?;
 777            this.update(&mut cx, |this, cx| {
 778                let task = this.update_channels(
 779                    proto::UpdateChannels {
 780                        channels: vec![channel],
 781                        ..Default::default()
 782                    },
 783                    cx,
 784                );
 785                assert!(task.is_none());
 786
 787                // This event is emitted because the collab panel wants to clear the pending edit state
 788                // before this frame is rendered. But we can't guarantee that the collab panel's future
 789                // will resolve before this flush_effects finishes. Synchronously emitting this event
 790                // ensures that the collab panel will observe this creation before the frame complete
 791                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
 792            })?;
 793            Ok(())
 794        })
 795    }
 796
 797    pub fn respond_to_channel_invite(
 798        &mut self,
 799        channel_id: ChannelId,
 800        accept: bool,
 801        cx: &mut ModelContext<Self>,
 802    ) -> Task<Result<()>> {
 803        let client = self.client.clone();
 804        cx.background_executor().spawn(async move {
 805            client
 806                .request(proto::RespondToChannelInvite {
 807                    channel_id: channel_id.0,
 808                    accept,
 809                })
 810                .await?;
 811            Ok(())
 812        })
 813    }
 814
 815    pub fn get_channel_member_details(
 816        &self,
 817        channel_id: ChannelId,
 818        cx: &mut ModelContext<Self>,
 819    ) -> Task<Result<Vec<ChannelMembership>>> {
 820        let client = self.client.clone();
 821        let user_store = self.user_store.downgrade();
 822        cx.spawn(move |_, mut cx| async move {
 823            let response = client
 824                .request(proto::GetChannelMembers {
 825                    channel_id: channel_id.0,
 826                })
 827                .await?;
 828
 829            let user_ids = response.members.iter().map(|m| m.user_id).collect();
 830            let user_store = user_store
 831                .upgrade()
 832                .ok_or_else(|| anyhow!("user store dropped"))?;
 833            let users = user_store
 834                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
 835                .await?;
 836
 837            Ok(users
 838                .into_iter()
 839                .zip(response.members)
 840                .map(|(user, member)| ChannelMembership {
 841                    user,
 842                    role: member.role(),
 843                    kind: member.kind(),
 844                })
 845                .collect())
 846        })
 847    }
 848
 849    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
 850        let client = self.client.clone();
 851        async move {
 852            client
 853                .request(proto::DeleteChannel {
 854                    channel_id: channel_id.0,
 855                })
 856                .await?;
 857            Ok(())
 858        }
 859    }
 860
 861    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
 862        false
 863    }
 864
 865    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
 866        self.outgoing_invites.contains(&(channel_id, user_id))
 867    }
 868
 869    async fn handle_update_channels(
 870        this: Model<Self>,
 871        message: TypedEnvelope<proto::UpdateChannels>,
 872        _: Arc<Client>,
 873        mut cx: AsyncAppContext,
 874    ) -> Result<()> {
 875        this.update(&mut cx, |this, _| {
 876            this.update_channels_tx
 877                .unbounded_send(message.payload)
 878                .unwrap();
 879        })?;
 880        Ok(())
 881    }
 882
 883    async fn handle_update_user_channels(
 884        this: Model<Self>,
 885        message: TypedEnvelope<proto::UpdateUserChannels>,
 886        _: Arc<Client>,
 887        mut cx: AsyncAppContext,
 888    ) -> Result<()> {
 889        this.update(&mut cx, |this, cx| {
 890            for buffer_version in message.payload.observed_channel_buffer_version {
 891                let version = language::proto::deserialize_version(&buffer_version.version);
 892                this.acknowledge_notes_version(
 893                    ChannelId(buffer_version.channel_id),
 894                    buffer_version.epoch,
 895                    &version,
 896                    cx,
 897                );
 898            }
 899            for message_id in message.payload.observed_channel_message_id {
 900                this.acknowledge_message_id(
 901                    ChannelId(message_id.channel_id),
 902                    message_id.message_id,
 903                    cx,
 904                );
 905            }
 906            for membership in message.payload.channel_memberships {
 907                if let Some(role) = ChannelRole::from_i32(membership.role) {
 908                    this.channel_states
 909                        .entry(ChannelId(membership.channel_id))
 910                        .or_insert_with(|| ChannelState::default())
 911                        .set_role(role)
 912                }
 913            }
 914        })
 915    }
 916
 917    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
 918        self.channel_index.clear();
 919        self.channel_invitations.clear();
 920        self.channel_participants.clear();
 921        self.channel_index.clear();
 922        self.outgoing_invites.clear();
 923        self.disconnect_channel_buffers_task.take();
 924
 925        for chat in self.opened_chats.values() {
 926            if let OpenedModelHandle::Open(chat) = chat {
 927                if let Some(chat) = chat.upgrade() {
 928                    chat.update(cx, |chat, cx| {
 929                        chat.rejoin(cx);
 930                    });
 931                }
 932            }
 933        }
 934
 935        let mut buffer_versions = Vec::new();
 936        for buffer in self.opened_buffers.values() {
 937            if let OpenedModelHandle::Open(buffer) = buffer {
 938                if let Some(buffer) = buffer.upgrade() {
 939                    let channel_buffer = buffer.read(cx);
 940                    let buffer = channel_buffer.buffer().read(cx);
 941                    buffer_versions.push(proto::ChannelBufferVersion {
 942                        channel_id: channel_buffer.channel_id.0,
 943                        epoch: channel_buffer.epoch(),
 944                        version: language::proto::serialize_version(&buffer.version()),
 945                    });
 946                }
 947            }
 948        }
 949
 950        if buffer_versions.is_empty() {
 951            return Task::ready(Ok(()));
 952        }
 953
 954        let response = self.client.request(proto::RejoinChannelBuffers {
 955            buffers: buffer_versions,
 956        });
 957
 958        cx.spawn(|this, mut cx| async move {
 959            let mut response = response.await?;
 960
 961            this.update(&mut cx, |this, cx| {
 962                this.opened_buffers.retain(|_, buffer| match buffer {
 963                    OpenedModelHandle::Open(channel_buffer) => {
 964                        let Some(channel_buffer) = channel_buffer.upgrade() else {
 965                            return false;
 966                        };
 967
 968                        channel_buffer.update(cx, |channel_buffer, cx| {
 969                            let channel_id = channel_buffer.channel_id;
 970                            if let Some(remote_buffer) = response
 971                                .buffers
 972                                .iter_mut()
 973                                .find(|buffer| buffer.channel_id == channel_id.0)
 974                            {
 975                                let channel_id = channel_buffer.channel_id;
 976                                let remote_version =
 977                                    language::proto::deserialize_version(&remote_buffer.version);
 978
 979                                channel_buffer.replace_collaborators(
 980                                    mem::take(&mut remote_buffer.collaborators),
 981                                    cx,
 982                                );
 983
 984                                let operations = channel_buffer
 985                                    .buffer()
 986                                    .update(cx, |buffer, cx| {
 987                                        let outgoing_operations =
 988                                            buffer.serialize_ops(Some(remote_version), cx);
 989                                        let incoming_operations =
 990                                            mem::take(&mut remote_buffer.operations)
 991                                                .into_iter()
 992                                                .map(language::proto::deserialize_operation)
 993                                                .collect::<Result<Vec<_>>>()?;
 994                                        buffer.apply_ops(incoming_operations, cx)?;
 995                                        anyhow::Ok(outgoing_operations)
 996                                    })
 997                                    .log_err();
 998
 999                                if let Some(operations) = operations {
1000                                    let client = this.client.clone();
1001                                    cx.background_executor()
1002                                        .spawn(async move {
1003                                            let operations = operations.await;
1004                                            for chunk in
1005                                                language::proto::split_operations(operations)
1006                                            {
1007                                                client
1008                                                    .send(proto::UpdateChannelBuffer {
1009                                                        channel_id: channel_id.0,
1010                                                        operations: chunk,
1011                                                    })
1012                                                    .ok();
1013                                            }
1014                                        })
1015                                        .detach();
1016                                    return true;
1017                                }
1018                            }
1019
1020                            channel_buffer.disconnect(cx);
1021                            false
1022                        })
1023                    }
1024                    OpenedModelHandle::Loading(_) => true,
1025                });
1026            })
1027            .ok();
1028            anyhow::Ok(())
1029        })
1030    }
1031
1032    fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
1033        cx.notify();
1034
1035        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1036            cx.spawn(move |this, mut cx| async move {
1037                if wait_for_reconnect {
1038                    cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1039                }
1040
1041                if let Some(this) = this.upgrade() {
1042                    this.update(&mut cx, |this, cx| {
1043                        for (_, buffer) in this.opened_buffers.drain() {
1044                            if let OpenedModelHandle::Open(buffer) = buffer {
1045                                if let Some(buffer) = buffer.upgrade() {
1046                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1047                                }
1048                            }
1049                        }
1050                    })
1051                    .ok();
1052                }
1053            })
1054        });
1055    }
1056
1057    pub(crate) fn update_channels(
1058        &mut self,
1059        payload: proto::UpdateChannels,
1060        cx: &mut ModelContext<ChannelStore>,
1061    ) -> Option<Task<Result<()>>> {
1062        if !payload.remove_channel_invitations.is_empty() {
1063            self.channel_invitations
1064                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1065        }
1066        for channel in payload.channel_invitations {
1067            match self
1068                .channel_invitations
1069                .binary_search_by_key(&channel.id, |c| c.id.0)
1070            {
1071                Ok(ix) => {
1072                    Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1073                }
1074                Err(ix) => self.channel_invitations.insert(
1075                    ix,
1076                    Arc::new(Channel {
1077                        id: ChannelId(channel.id),
1078                        visibility: channel.visibility(),
1079                        name: channel.name.into(),
1080                        parent_path: channel
1081                            .parent_path
1082                            .into_iter()
1083                            .map(|cid| ChannelId(cid))
1084                            .collect(),
1085                    }),
1086                ),
1087            }
1088        }
1089
1090        let channels_changed = !payload.channels.is_empty()
1091            || !payload.delete_channels.is_empty()
1092            || !payload.latest_channel_message_ids.is_empty()
1093            || !payload.latest_channel_buffer_versions.is_empty()
1094            || !payload.hosted_projects.is_empty()
1095            || !payload.deleted_hosted_projects.is_empty();
1096
1097        if channels_changed {
1098            if !payload.delete_channels.is_empty() {
1099                let delete_channels: Vec<ChannelId> = payload
1100                    .delete_channels
1101                    .into_iter()
1102                    .map(|cid| ChannelId(cid))
1103                    .collect();
1104                self.channel_index.delete_channels(&delete_channels);
1105                self.channel_participants
1106                    .retain(|channel_id, _| !delete_channels.contains(&channel_id));
1107
1108                for channel_id in &delete_channels {
1109                    let channel_id = *channel_id;
1110                    if payload
1111                        .channels
1112                        .iter()
1113                        .any(|channel| channel.id == channel_id.0)
1114                    {
1115                        continue;
1116                    }
1117                    if let Some(OpenedModelHandle::Open(buffer)) =
1118                        self.opened_buffers.remove(&channel_id)
1119                    {
1120                        if let Some(buffer) = buffer.upgrade() {
1121                            buffer.update(cx, ChannelBuffer::disconnect);
1122                        }
1123                    }
1124                }
1125            }
1126
1127            let mut index = self.channel_index.bulk_insert();
1128            for channel in payload.channels {
1129                let id = ChannelId(channel.id);
1130                let channel_changed = index.insert(channel);
1131
1132                if channel_changed {
1133                    if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1134                        if let Some(buffer) = buffer.upgrade() {
1135                            buffer.update(cx, ChannelBuffer::channel_changed);
1136                        }
1137                    }
1138                }
1139            }
1140
1141            for latest_buffer_version in payload.latest_channel_buffer_versions {
1142                let version = language::proto::deserialize_version(&latest_buffer_version.version);
1143                self.channel_states
1144                    .entry(ChannelId(latest_buffer_version.channel_id))
1145                    .or_default()
1146                    .update_latest_notes_version(latest_buffer_version.epoch, &version)
1147            }
1148
1149            for latest_channel_message in payload.latest_channel_message_ids {
1150                self.channel_states
1151                    .entry(ChannelId(latest_channel_message.channel_id))
1152                    .or_default()
1153                    .update_latest_message_id(latest_channel_message.message_id);
1154            }
1155
1156            for hosted_project in payload.hosted_projects {
1157                let hosted_project: HostedProject = hosted_project.into();
1158                if let Some(old_project) = self
1159                    .hosted_projects
1160                    .insert(hosted_project.project_id, hosted_project.clone())
1161                {
1162                    self.channel_states
1163                        .entry(old_project.channel_id)
1164                        .or_default()
1165                        .remove_hosted_project(old_project.project_id);
1166                }
1167                self.channel_states
1168                    .entry(hosted_project.channel_id)
1169                    .or_default()
1170                    .add_hosted_project(hosted_project.project_id);
1171            }
1172
1173            for hosted_project_id in payload.deleted_hosted_projects {
1174                let hosted_project_id = ProjectId(hosted_project_id);
1175
1176                if let Some(old_project) = self.hosted_projects.remove(&hosted_project_id) {
1177                    self.channel_states
1178                        .entry(old_project.channel_id)
1179                        .or_default()
1180                        .remove_hosted_project(old_project.project_id);
1181                }
1182            }
1183        }
1184
1185        cx.notify();
1186        if payload.channel_participants.is_empty() {
1187            return None;
1188        }
1189
1190        let mut all_user_ids = Vec::new();
1191        let channel_participants = payload.channel_participants;
1192        for entry in &channel_participants {
1193            for user_id in entry.participant_user_ids.iter() {
1194                if let Err(ix) = all_user_ids.binary_search(user_id) {
1195                    all_user_ids.insert(ix, *user_id);
1196                }
1197            }
1198        }
1199
1200        let users = self
1201            .user_store
1202            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1203        Some(cx.spawn(|this, mut cx| async move {
1204            let users = users.await?;
1205
1206            this.update(&mut cx, |this, cx| {
1207                for entry in &channel_participants {
1208                    let mut participants: Vec<_> = entry
1209                        .participant_user_ids
1210                        .iter()
1211                        .filter_map(|user_id| {
1212                            users
1213                                .binary_search_by_key(&user_id, |user| &user.id)
1214                                .ok()
1215                                .map(|ix| users[ix].clone())
1216                        })
1217                        .collect();
1218
1219                    participants.sort_by_key(|u| u.id);
1220
1221                    this.channel_participants
1222                        .insert(ChannelId(entry.channel_id), participants);
1223                }
1224
1225                cx.notify();
1226            })
1227        }))
1228    }
1229}
1230
1231impl ChannelState {
1232    fn set_role(&mut self, role: ChannelRole) {
1233        self.role = Some(role);
1234    }
1235
1236    fn has_channel_buffer_changed(&self) -> bool {
1237        self.latest_notes_version.epoch > self.observed_notes_version.epoch
1238            || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1239                && self
1240                    .latest_notes_version
1241                    .version
1242                    .changed_since(&self.observed_notes_version.version))
1243    }
1244
1245    fn has_new_messages(&self) -> bool {
1246        let latest_message_id = self.latest_chat_message;
1247        let observed_message_id = self.observed_chat_message;
1248
1249        latest_message_id.is_some_and(|latest_message_id| {
1250            latest_message_id > observed_message_id.unwrap_or_default()
1251        })
1252    }
1253
1254    fn last_acknowledged_message_id(&self) -> Option<u64> {
1255        self.observed_chat_message
1256    }
1257
1258    fn acknowledge_message_id(&mut self, message_id: u64) {
1259        let observed = self.observed_chat_message.get_or_insert(message_id);
1260        *observed = (*observed).max(message_id);
1261    }
1262
1263    fn update_latest_message_id(&mut self, message_id: u64) {
1264        self.latest_chat_message =
1265            Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1266    }
1267
1268    fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1269        if self.observed_notes_version.epoch == epoch {
1270            self.observed_notes_version.version.join(version);
1271        } else {
1272            self.observed_notes_version = NotesVersion {
1273                epoch,
1274                version: version.clone(),
1275            };
1276        }
1277    }
1278
1279    fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1280        if self.latest_notes_version.epoch == epoch {
1281            self.latest_notes_version.version.join(version);
1282        } else {
1283            self.latest_notes_version = NotesVersion {
1284                epoch,
1285                version: version.clone(),
1286            };
1287        }
1288    }
1289
1290    fn add_hosted_project(&mut self, project_id: ProjectId) {
1291        self.projects.insert(project_id);
1292    }
1293
1294    fn remove_hosted_project(&mut self, project_id: ProjectId) {
1295        self.projects.remove(&project_id);
1296    }
1297}