1mod channel_index;
2
3use crate::channel_buffer::ChannelBuffer;
4use anyhow::{Context as _, Result, anyhow};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{HashMap, HashSet};
8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
11 WeakEntity,
12};
13use language::Capability;
14use postage::{sink::Sink, watch};
15use rpc::{
16 TypedEnvelope,
17 proto::{self, ChannelRole, ChannelVisibility},
18};
19use settings::Settings;
20use std::{mem, sync::Arc, time::Duration};
21use util::{ResultExt, maybe};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
26 let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36pub struct ChannelStore {
37 pub channel_index: ChannelIndex,
38 channel_invitations: Vec<Arc<Channel>>,
39 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
40 channel_states: HashMap<ChannelId, ChannelState>,
41 favorite_channel_ids: Vec<ChannelId>,
42 outgoing_invites: HashSet<(ChannelId, UserId)>,
43 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
44 opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
45 client: Arc<Client>,
46 did_subscribe: bool,
47 channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
48 user_store: Entity<UserStore>,
49 _rpc_subscriptions: [Subscription; 2],
50 _watch_connection_status: Task<Option<()>>,
51 disconnect_channel_buffers_task: Option<Task<()>>,
52 _update_channels: Task<()>,
53}
54
55#[derive(Clone, Debug)]
56pub struct Channel {
57 pub id: ChannelId,
58 pub name: SharedString,
59 pub visibility: proto::ChannelVisibility,
60 pub parent_path: Vec<ChannelId>,
61 pub channel_order: i32,
62}
63
64#[derive(Default, Debug)]
65pub struct ChannelState {
66 latest_notes_version: NotesVersion,
67 observed_notes_version: NotesVersion,
68 role: Option<ChannelRole>,
69}
70
71impl Channel {
72 pub fn link(&self, cx: &App) -> String {
73 format!(
74 "{}/channel/{}-{}",
75 ClientSettings::get_global(cx).server_url,
76 Self::slug(&self.name),
77 self.id
78 )
79 }
80
81 pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
82 self.link(cx)
83 + "/notes"
84 + &heading
85 .map(|h| format!("#{}", Self::slug(&h)))
86 .unwrap_or_default()
87 }
88
89 pub fn is_root_channel(&self) -> bool {
90 self.parent_path.is_empty()
91 }
92
93 pub fn root_id(&self) -> ChannelId {
94 self.parent_path.first().copied().unwrap_or(self.id)
95 }
96
97 pub fn slug(str: &str) -> String {
98 let slug: String = str
99 .chars()
100 .map(|c| if c.is_alphanumeric() { c } else { '-' })
101 .collect();
102
103 slug.trim_matches(|c| c == '-').to_string()
104 }
105}
106
107#[derive(Debug)]
108pub struct ChannelMembership {
109 pub user: Arc<User>,
110 pub kind: proto::channel_member::Kind,
111 pub role: proto::ChannelRole,
112}
113impl ChannelMembership {
114 pub fn sort_key(&self) -> MembershipSortKey<'_> {
115 MembershipSortKey {
116 role_order: match self.role {
117 proto::ChannelRole::Admin => 0,
118 proto::ChannelRole::Member => 1,
119 proto::ChannelRole::Banned => 2,
120 proto::ChannelRole::Talker => 3,
121 proto::ChannelRole::Guest => 4,
122 },
123 kind_order: match self.kind {
124 proto::channel_member::Kind::Member => 0,
125 proto::channel_member::Kind::Invitee => 1,
126 },
127 username_order: &self.user.github_login,
128 }
129 }
130}
131
132#[derive(PartialOrd, Ord, PartialEq, Eq)]
133pub struct MembershipSortKey<'a> {
134 role_order: u8,
135 kind_order: u8,
136 username_order: &'a str,
137}
138
139pub enum ChannelEvent {
140 ChannelCreated(ChannelId),
141 ChannelRenamed(ChannelId),
142}
143
144impl EventEmitter<ChannelEvent> for ChannelStore {}
145
146enum OpenEntityHandle<E> {
147 Open(WeakEntity<E>),
148 Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
149}
150
151struct GlobalChannelStore(Entity<ChannelStore>);
152
153impl Global for GlobalChannelStore {}
154
155impl ChannelStore {
156 pub fn global(cx: &App) -> Entity<Self> {
157 cx.global::<GlobalChannelStore>().0.clone()
158 }
159
160 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
161 cx.try_global::<GlobalChannelStore>().map(|g| g.0.clone())
162 }
163
164 pub fn favorite_channel_ids(&self) -> &[ChannelId] {
165 &self.favorite_channel_ids
166 }
167
168 pub fn is_channel_favorited(&self, channel_id: ChannelId) -> bool {
169 self.favorite_channel_ids.contains(&channel_id)
170 }
171
172 pub fn toggle_favorite_channel(&mut self, channel_id: ChannelId, cx: &mut Context<Self>) {
173 if let Some(ix) = self
174 .favorite_channel_ids
175 .iter()
176 .position(|id| *id == channel_id)
177 {
178 self.favorite_channel_ids.remove(ix);
179 } else {
180 self.favorite_channel_ids.push(channel_id);
181 }
182 cx.notify();
183 }
184
185 pub fn set_favorite_channel_ids(&mut self, ids: Vec<ChannelId>, cx: &mut Context<Self>) {
186 self.favorite_channel_ids = ids;
187 cx.notify();
188 }
189
190 pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
191 let rpc_subscriptions = [
192 client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
193 client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
194 ];
195
196 let mut connection_status = client.status();
197 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
198 let watch_connection_status = cx.spawn(async move |this, cx| {
199 while let Some(status) = connection_status.next().await {
200 let this = this.upgrade()?;
201 match status {
202 client::Status::Connected { .. } => {
203 this.update(cx, |this, cx| this.handle_connect(cx))
204 .await
205 .log_err()?;
206 }
207 client::Status::SignedOut | client::Status::UpgradeRequired => {
208 this.update(cx, |this, cx| this.handle_disconnect(false, cx));
209 }
210 _ => {
211 this.update(cx, |this, cx| this.handle_disconnect(true, cx));
212 }
213 }
214 }
215 Some(())
216 });
217
218 Self {
219 channel_invitations: Vec::default(),
220 channel_index: ChannelIndex::default(),
221 channel_participants: Default::default(),
222 outgoing_invites: Default::default(),
223 opened_buffers: Default::default(),
224 update_channels_tx,
225 client,
226 user_store,
227 _rpc_subscriptions: rpc_subscriptions,
228 _watch_connection_status: watch_connection_status,
229 disconnect_channel_buffers_task: None,
230 _update_channels: cx.spawn(async move |this, cx| {
231 maybe!(async move {
232 while let Some(update_channels) = update_channels_rx.next().await {
233 if let Some(this) = this.upgrade() {
234 let update_task = this
235 .update(cx, |this, cx| this.update_channels(update_channels, cx));
236 if let Some(update_task) = update_task {
237 update_task.await.log_err();
238 }
239 }
240 }
241 anyhow::Ok(())
242 })
243 .await
244 .log_err();
245 }),
246 channel_states: Default::default(),
247 favorite_channel_ids: Vec::default(),
248 did_subscribe: false,
249 channels_loaded: watch::channel_with(false),
250 }
251 }
252
253 pub fn initialize(&mut self) {
254 if !self.did_subscribe
255 && self
256 .client
257 .send(proto::SubscribeToChannels {})
258 .log_err()
259 .is_some()
260 {
261 self.did_subscribe = true;
262 }
263 }
264
265 pub fn wait_for_channels(
266 &mut self,
267 timeout: Duration,
268 cx: &mut Context<Self>,
269 ) -> Task<Result<()>> {
270 let mut channels_loaded_rx = self.channels_loaded.1.clone();
271 if *channels_loaded_rx.borrow() {
272 return Task::ready(Ok(()));
273 }
274
275 let mut status_receiver = self.client.status();
276 if status_receiver.borrow().is_connected() {
277 self.initialize();
278 }
279
280 let mut timer = cx.background_executor().timer(timeout).fuse();
281 cx.spawn(async move |this, cx| {
282 loop {
283 futures::select_biased! {
284 channels_loaded = channels_loaded_rx.next().fuse() => {
285 if let Some(true) = channels_loaded {
286 return Ok(());
287 }
288 }
289 status = status_receiver.next().fuse() => {
290 if let Some(status) = status
291 && status.is_connected() {
292 this.update(cx, |this, _cx| {
293 this.initialize();
294 }).ok();
295 }
296 continue;
297 }
298 _ = timer => {
299 return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
300 }
301 }
302 }
303 })
304 }
305
306 pub fn client(&self) -> Arc<Client> {
307 self.client.clone()
308 }
309
310 /// Returns the number of unique channels in the store
311 pub fn channel_count(&self) -> usize {
312 self.channel_index.by_id().len()
313 }
314
315 /// Returns the index of a channel ID in the list of unique channels
316 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
317 self.channel_index
318 .by_id()
319 .keys()
320 .position(|id| *id == channel_id)
321 }
322
323 /// Returns an iterator over all unique channels
324 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
325 self.channel_index.by_id().values()
326 }
327
328 /// Iterate over all entries in the channel DAG
329 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
330 self.channel_index
331 .ordered_channels()
332 .iter()
333 .filter_map(move |id| {
334 let channel = self.channel_index.by_id().get(id)?;
335 Some((channel.parent_path.len(), channel))
336 })
337 }
338
339 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
340 let channel_id = self.channel_index.ordered_channels().get(ix)?;
341 self.channel_index.by_id().get(channel_id)
342 }
343
344 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
345 self.channel_index.by_id().values().nth(ix)
346 }
347
348 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
349 self.channel_invitations
350 .iter()
351 .any(|channel| channel.id == channel_id)
352 }
353
354 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
355 &self.channel_invitations
356 }
357
358 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
359 self.channel_index.by_id().get(&channel_id)
360 }
361
362 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
363 if let Some(buffer) = self.opened_buffers.get(&channel_id)
364 && let OpenEntityHandle::Open(buffer) = buffer
365 {
366 return buffer.upgrade().is_some();
367 }
368 false
369 }
370
371 pub fn open_channel_buffer(
372 &mut self,
373 channel_id: ChannelId,
374 cx: &mut Context<Self>,
375 ) -> Task<Result<Entity<ChannelBuffer>>> {
376 let client = self.client.clone();
377 let user_store = self.user_store.clone();
378 let channel_store = cx.entity();
379 self.open_channel_resource(
380 channel_id,
381 "notes",
382 |this| &mut this.opened_buffers,
383 async move |channel, cx| {
384 ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
385 },
386 cx,
387 )
388 }
389
390 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
391 self.channel_states
392 .get(&channel_id)
393 .is_some_and(|state| state.has_channel_buffer_changed())
394 }
395
396 pub fn acknowledge_notes_version(
397 &mut self,
398 channel_id: ChannelId,
399 epoch: u64,
400 version: &clock::Global,
401 cx: &mut Context<Self>,
402 ) {
403 self.channel_states
404 .entry(channel_id)
405 .or_default()
406 .acknowledge_notes_version(epoch, version);
407 cx.notify()
408 }
409
410 pub fn update_latest_notes_version(
411 &mut self,
412 channel_id: ChannelId,
413 epoch: u64,
414 version: &clock::Global,
415 cx: &mut Context<Self>,
416 ) {
417 self.channel_states
418 .entry(channel_id)
419 .or_default()
420 .update_latest_notes_version(epoch, version);
421 cx.notify()
422 }
423
424 /// Asynchronously open a given resource associated with a channel.
425 ///
426 /// Make sure that the resource is only opened once, even if this method
427 /// is called multiple times with the same channel id while the first task
428 /// is still running.
429 fn open_channel_resource<T, F>(
430 &mut self,
431 channel_id: ChannelId,
432 resource_name: &'static str,
433 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
434 load: F,
435 cx: &mut Context<Self>,
436 ) -> Task<Result<Entity<T>>>
437 where
438 F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
439 T: 'static,
440 {
441 let task = loop {
442 match get_map(self).get(&channel_id) {
443 Some(OpenEntityHandle::Open(entity)) => {
444 if let Some(entity) = entity.upgrade() {
445 break Task::ready(Ok(entity)).shared();
446 } else {
447 get_map(self).remove(&channel_id);
448 continue;
449 }
450 }
451 Some(OpenEntityHandle::Loading(task)) => break task.clone(),
452 None => {}
453 }
454
455 let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
456 let task = cx
457 .spawn(async move |this, cx| {
458 channels_ready.await?;
459 let channel = this.read_with(cx, |this, _| {
460 this.channel_for_id(channel_id)
461 .cloned()
462 .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
463 })??;
464
465 load(channel, cx).await.map_err(Arc::new)
466 })
467 .shared();
468
469 get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
470 let task = cx.spawn({
471 async move |this, cx| {
472 let result = task.await;
473 this.update(cx, |this, _| match &result {
474 Ok(model) => {
475 get_map(this)
476 .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
477 }
478 Err(_) => {
479 get_map(this).remove(&channel_id);
480 }
481 })?;
482 result
483 }
484 });
485 break task.shared();
486 };
487 cx.background_spawn(async move {
488 task.await.map_err(|error| {
489 anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
490 })
491 })
492 }
493
494 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
495 self.channel_role(channel_id) == proto::ChannelRole::Admin
496 }
497
498 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
499 self.channel_index
500 .by_id()
501 .get(&channel_id)
502 .is_some_and(|channel| channel.is_root_channel())
503 }
504
505 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
506 self.channel_index
507 .by_id()
508 .get(&channel_id)
509 .is_some_and(|channel| channel.visibility == ChannelVisibility::Public)
510 }
511
512 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
513 match self.channel_role(channel_id) {
514 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
515 _ => Capability::ReadOnly,
516 }
517 }
518
519 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
520 maybe!({
521 let mut channel = self.channel_for_id(channel_id)?;
522 if !channel.is_root_channel() {
523 channel = self.channel_for_id(channel.root_id())?;
524 }
525 let root_channel_state = self.channel_states.get(&channel.id);
526 root_channel_state?.role
527 })
528 .unwrap_or(proto::ChannelRole::Guest)
529 }
530
531 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
532 self.channel_participants
533 .get(&channel_id)
534 .map_or(&[], |v| v.as_slice())
535 }
536
537 pub fn create_channel(
538 &self,
539 name: &str,
540 parent_id: Option<ChannelId>,
541 cx: &mut Context<Self>,
542 ) -> Task<Result<ChannelId>> {
543 let client = self.client.clone();
544 let name = name.trim_start_matches('#').to_owned();
545 cx.spawn(async move |this, cx| {
546 let response = client
547 .request(proto::CreateChannel {
548 name,
549 parent_id: parent_id.map(|cid| cid.0),
550 })
551 .await?;
552
553 let channel = response.channel.context("missing channel in response")?;
554 let channel_id = ChannelId(channel.id);
555
556 this.update(cx, |this, cx| {
557 let task = this.update_channels(
558 proto::UpdateChannels {
559 channels: vec![channel],
560 ..Default::default()
561 },
562 cx,
563 );
564 assert!(task.is_none());
565
566 // This event is emitted because the collab panel wants to clear the pending edit state
567 // before this frame is rendered. But we can't guarantee that the collab panel's future
568 // will resolve before this flush_effects finishes. Synchronously emitting this event
569 // ensures that the collab panel will observe this creation before the frame completes
570 cx.emit(ChannelEvent::ChannelCreated(channel_id));
571 })?;
572
573 Ok(channel_id)
574 })
575 }
576
577 pub fn move_channel(
578 &mut self,
579 channel_id: ChannelId,
580 to: ChannelId,
581 cx: &mut Context<Self>,
582 ) -> Task<Result<()>> {
583 let client = self.client.clone();
584 cx.spawn(async move |_, _| {
585 let _ = client
586 .request(proto::MoveChannel {
587 channel_id: channel_id.0,
588 to: to.0,
589 })
590 .await?;
591 Ok(())
592 })
593 }
594
595 pub fn reorder_channel(
596 &mut self,
597 channel_id: ChannelId,
598 direction: proto::reorder_channel::Direction,
599 cx: &mut Context<Self>,
600 ) -> Task<Result<()>> {
601 let client = self.client.clone();
602 cx.spawn(async move |_, _| {
603 client
604 .request(proto::ReorderChannel {
605 channel_id: channel_id.0,
606 direction: direction.into(),
607 })
608 .await?;
609 Ok(())
610 })
611 }
612
613 pub fn set_channel_visibility(
614 &mut self,
615 channel_id: ChannelId,
616 visibility: ChannelVisibility,
617 cx: &mut Context<Self>,
618 ) -> Task<Result<()>> {
619 let client = self.client.clone();
620 cx.spawn(async move |_, _| {
621 let _ = client
622 .request(proto::SetChannelVisibility {
623 channel_id: channel_id.0,
624 visibility: visibility.into(),
625 })
626 .await?;
627
628 Ok(())
629 })
630 }
631
632 pub fn invite_member(
633 &mut self,
634 channel_id: ChannelId,
635 user_id: UserId,
636 role: proto::ChannelRole,
637 cx: &mut Context<Self>,
638 ) -> Task<Result<()>> {
639 if !self.outgoing_invites.insert((channel_id, user_id)) {
640 return Task::ready(Err(anyhow!("invite request already in progress")));
641 }
642
643 cx.notify();
644 let client = self.client.clone();
645 cx.spawn(async move |this, cx| {
646 let result = client
647 .request(proto::InviteChannelMember {
648 channel_id: channel_id.0,
649 user_id,
650 role: role.into(),
651 })
652 .await;
653
654 this.update(cx, |this, cx| {
655 this.outgoing_invites.remove(&(channel_id, user_id));
656 cx.notify();
657 })?;
658
659 result?;
660
661 Ok(())
662 })
663 }
664
665 pub fn remove_member(
666 &mut self,
667 channel_id: ChannelId,
668 user_id: u64,
669 cx: &mut Context<Self>,
670 ) -> Task<Result<()>> {
671 if !self.outgoing_invites.insert((channel_id, user_id)) {
672 return Task::ready(Err(anyhow!("invite request already in progress")));
673 }
674
675 cx.notify();
676 let client = self.client.clone();
677 cx.spawn(async move |this, cx| {
678 let result = client
679 .request(proto::RemoveChannelMember {
680 channel_id: channel_id.0,
681 user_id,
682 })
683 .await;
684
685 this.update(cx, |this, cx| {
686 this.outgoing_invites.remove(&(channel_id, user_id));
687 cx.notify();
688 })?;
689 result?;
690 Ok(())
691 })
692 }
693
694 pub fn set_member_role(
695 &mut self,
696 channel_id: ChannelId,
697 user_id: UserId,
698 role: proto::ChannelRole,
699 cx: &mut Context<Self>,
700 ) -> Task<Result<()>> {
701 if !self.outgoing_invites.insert((channel_id, user_id)) {
702 return Task::ready(Err(anyhow!("member request already in progress")));
703 }
704
705 cx.notify();
706 let client = self.client.clone();
707 cx.spawn(async move |this, cx| {
708 let result = client
709 .request(proto::SetChannelMemberRole {
710 channel_id: channel_id.0,
711 user_id,
712 role: role.into(),
713 })
714 .await;
715
716 this.update(cx, |this, cx| {
717 this.outgoing_invites.remove(&(channel_id, user_id));
718 cx.notify();
719 })?;
720
721 result?;
722 Ok(())
723 })
724 }
725
726 pub fn rename(
727 &mut self,
728 channel_id: ChannelId,
729 new_name: &str,
730 cx: &mut Context<Self>,
731 ) -> Task<Result<()>> {
732 let client = self.client.clone();
733 let name = new_name.to_string();
734 cx.spawn(async move |this, cx| {
735 let channel = client
736 .request(proto::RenameChannel {
737 channel_id: channel_id.0,
738 name,
739 })
740 .await?
741 .channel
742 .context("missing channel in response")?;
743 this.update(cx, |this, cx| {
744 let task = this.update_channels(
745 proto::UpdateChannels {
746 channels: vec![channel],
747 ..Default::default()
748 },
749 cx,
750 );
751 assert!(task.is_none());
752
753 // This event is emitted because the collab panel wants to clear the pending edit state
754 // before this frame is rendered. But we can't guarantee that the collab panel's future
755 // will resolve before this flush_effects finishes. Synchronously emitting this event
756 // ensures that the collab panel will observe this creation before the frame complete
757 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
758 })?;
759 Ok(())
760 })
761 }
762
763 pub fn respond_to_channel_invite(
764 &mut self,
765 channel_id: ChannelId,
766 accept: bool,
767 cx: &mut Context<Self>,
768 ) -> Task<Result<()>> {
769 let client = self.client.clone();
770 cx.background_spawn(async move {
771 client
772 .request(proto::RespondToChannelInvite {
773 channel_id: channel_id.0,
774 accept,
775 })
776 .await?;
777 Ok(())
778 })
779 }
780 pub fn fuzzy_search_members(
781 &self,
782 channel_id: ChannelId,
783 query: String,
784 limit: u16,
785 cx: &mut Context<Self>,
786 ) -> Task<Result<Vec<ChannelMembership>>> {
787 let client = self.client.clone();
788 let user_store = self.user_store.downgrade();
789 cx.spawn(async move |_, cx| {
790 let response = client
791 .request(proto::GetChannelMembers {
792 channel_id: channel_id.0,
793 query,
794 limit: limit as u64,
795 })
796 .await?;
797 user_store.update(cx, |user_store, _| {
798 user_store.insert(response.users);
799 response
800 .members
801 .into_iter()
802 .filter_map(|member| {
803 Some(ChannelMembership {
804 user: user_store.get_cached_user(member.user_id)?,
805 role: member.role(),
806 kind: member.kind(),
807 })
808 })
809 .collect()
810 })
811 })
812 }
813
814 pub fn remove_channel(
815 &self,
816 channel_id: ChannelId,
817 ) -> impl Future<Output = Result<()>> + use<> {
818 let client = self.client.clone();
819 async move {
820 client
821 .request(proto::DeleteChannel {
822 channel_id: channel_id.0,
823 })
824 .await?;
825 Ok(())
826 }
827 }
828
829 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
830 false
831 }
832
833 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
834 self.outgoing_invites.contains(&(channel_id, user_id))
835 }
836
837 async fn handle_update_channels(
838 this: Entity<Self>,
839 message: TypedEnvelope<proto::UpdateChannels>,
840 cx: AsyncApp,
841 ) -> Result<()> {
842 this.read_with(&cx, |this, _| {
843 this.update_channels_tx
844 .unbounded_send(message.payload)
845 .unwrap();
846 });
847 Ok(())
848 }
849
850 async fn handle_update_user_channels(
851 this: Entity<Self>,
852 message: TypedEnvelope<proto::UpdateUserChannels>,
853 mut cx: AsyncApp,
854 ) -> Result<()> {
855 this.update(&mut cx, |this, cx| {
856 for buffer_version in message.payload.observed_channel_buffer_version {
857 let version = language::proto::deserialize_version(&buffer_version.version);
858 this.acknowledge_notes_version(
859 ChannelId(buffer_version.channel_id),
860 buffer_version.epoch,
861 &version,
862 cx,
863 );
864 }
865 for membership in message.payload.channel_memberships {
866 if let Some(role) = ChannelRole::from_i32(membership.role) {
867 this.channel_states
868 .entry(ChannelId(membership.channel_id))
869 .or_default()
870 .set_role(role)
871 }
872 }
873 });
874 Ok(())
875 }
876
877 fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
878 self.channel_index.clear();
879 self.channel_invitations.clear();
880 self.channel_participants.clear();
881 self.channel_index.clear();
882 self.outgoing_invites.clear();
883 self.disconnect_channel_buffers_task.take();
884
885 let mut buffer_versions = Vec::new();
886 for buffer in self.opened_buffers.values() {
887 if let OpenEntityHandle::Open(buffer) = buffer
888 && let Some(buffer) = buffer.upgrade()
889 {
890 buffer.update(cx, |channel_buffer, cx| {
891 // Block on_buffer_update from sending UpdateChannelBuffer messages
892 // until the rejoin completes. This prevents a race condition where
893 // edits made during the rejoin async gap could inflate the server
894 // version, causing offline edits to be filtered out by serialize_ops.
895 channel_buffer.set_rejoining(true);
896 let inner_buffer = channel_buffer.buffer().read(cx);
897 buffer_versions.push(proto::ChannelBufferVersion {
898 channel_id: channel_buffer.channel_id.0,
899 epoch: channel_buffer.epoch(),
900 version: language::proto::serialize_version(&inner_buffer.version()),
901 });
902 });
903 }
904 }
905
906 if buffer_versions.is_empty() {
907 return Task::ready(Ok(()));
908 }
909
910 let response = self.client.request(proto::RejoinChannelBuffers {
911 buffers: buffer_versions,
912 });
913
914 cx.spawn(async move |this, cx| {
915 let response = match response.await {
916 Ok(response) => response,
917 Err(err) => {
918 // Clear rejoining flag on all buffers since the rejoin failed
919 this.update(cx, |this, cx| {
920 for buffer in this.opened_buffers.values() {
921 if let OpenEntityHandle::Open(buffer) = buffer {
922 if let Some(buffer) = buffer.upgrade() {
923 buffer.update(cx, |channel_buffer, _| {
924 channel_buffer.set_rejoining(false);
925 });
926 }
927 }
928 }
929 })
930 .ok();
931 return Err(err);
932 }
933 };
934 let mut response = response;
935
936 this.update(cx, |this, cx| {
937 this.opened_buffers.retain(|_, buffer| match buffer {
938 OpenEntityHandle::Open(channel_buffer) => {
939 let Some(channel_buffer) = channel_buffer.upgrade() else {
940 return false;
941 };
942
943 channel_buffer.update(cx, |channel_buffer, cx| {
944 let channel_id = channel_buffer.channel_id;
945 if let Some(remote_buffer) = response
946 .buffers
947 .iter_mut()
948 .find(|buffer| buffer.channel_id == channel_id.0)
949 {
950 let channel_id = channel_buffer.channel_id;
951 let remote_version =
952 language::proto::deserialize_version(&remote_buffer.version);
953
954 channel_buffer.replace_collaborators(
955 mem::take(&mut remote_buffer.collaborators),
956 cx,
957 );
958
959 let operations = channel_buffer
960 .buffer()
961 .update(cx, |buffer, cx| {
962 let outgoing_operations =
963 buffer.serialize_ops(Some(remote_version), cx);
964 let incoming_operations =
965 mem::take(&mut remote_buffer.operations)
966 .into_iter()
967 .map(language::proto::deserialize_operation)
968 .collect::<Result<Vec<_>>>()?;
969 buffer.apply_ops(incoming_operations, cx);
970 anyhow::Ok(outgoing_operations)
971 })
972 .log_err();
973
974 if let Some(operations) = operations {
975 channel_buffer.connected(cx);
976 let client = this.client.clone();
977 cx.background_spawn(async move {
978 let operations = operations.await;
979 for chunk in language::proto::split_operations(operations) {
980 client
981 .send(proto::UpdateChannelBuffer {
982 channel_id: channel_id.0,
983 operations: chunk,
984 })
985 .ok();
986 }
987 })
988 .detach();
989 return true;
990 }
991 }
992
993 channel_buffer.disconnect(cx);
994 false
995 })
996 }
997 OpenEntityHandle::Loading(_) => true,
998 });
999 })
1000 .ok();
1001 anyhow::Ok(())
1002 })
1003 }
1004
1005 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1006 cx.notify();
1007 self.did_subscribe = false;
1008
1009 // If we're waiting for reconnect, set rejoining=true on all buffers immediately.
1010 // This prevents operations from being sent during the reconnection window,
1011 // before handle_connect has a chance to run and capture the version.
1012 if wait_for_reconnect {
1013 for buffer in self.opened_buffers.values() {
1014 if let OpenEntityHandle::Open(buffer) = buffer {
1015 if let Some(buffer) = buffer.upgrade() {
1016 buffer.update(cx, |channel_buffer, _| {
1017 channel_buffer.set_rejoining(true);
1018 });
1019 }
1020 }
1021 }
1022 }
1023
1024 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1025 cx.spawn(async move |this, cx| {
1026 if wait_for_reconnect {
1027 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1028 }
1029
1030 if let Some(this) = this.upgrade() {
1031 this.update(cx, |this, cx| {
1032 for buffer in this.opened_buffers.values() {
1033 if let OpenEntityHandle::Open(buffer) = &buffer
1034 && let Some(buffer) = buffer.upgrade()
1035 {
1036 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1037 }
1038 }
1039 });
1040 }
1041 })
1042 });
1043 }
1044
1045 #[cfg(any(test, feature = "test-support"))]
1046 pub fn reset(&mut self) {
1047 self.channel_invitations.clear();
1048 self.channel_index.clear();
1049 self.channel_participants.clear();
1050 self.outgoing_invites.clear();
1051 self.opened_buffers.clear();
1052 self.disconnect_channel_buffers_task = None;
1053 self.channel_states.clear();
1054 }
1055
1056 pub(crate) fn update_channels(
1057 &mut self,
1058 payload: proto::UpdateChannels,
1059 cx: &mut Context<ChannelStore>,
1060 ) -> Option<Task<Result<()>>> {
1061 if !payload.remove_channel_invitations.is_empty() {
1062 self.channel_invitations
1063 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1064 }
1065 for channel in payload.channel_invitations {
1066 match self
1067 .channel_invitations
1068 .binary_search_by_key(&channel.id, |c| c.id.0)
1069 {
1070 Ok(ix) => {
1071 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1072 }
1073 Err(ix) => self.channel_invitations.insert(
1074 ix,
1075 Arc::new(Channel {
1076 id: ChannelId(channel.id),
1077 visibility: channel.visibility(),
1078 name: channel.name.into(),
1079 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1080 channel_order: channel.channel_order,
1081 }),
1082 ),
1083 }
1084 }
1085
1086 let channels_changed = !payload.channels.is_empty()
1087 || !payload.delete_channels.is_empty()
1088 || !payload.latest_channel_buffer_versions.is_empty();
1089
1090 if channels_changed {
1091 if !payload.delete_channels.is_empty() {
1092 let delete_channels: Vec<ChannelId> =
1093 payload.delete_channels.into_iter().map(ChannelId).collect();
1094 self.channel_index.delete_channels(&delete_channels);
1095 self.channel_participants
1096 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1097 self.favorite_channel_ids
1098 .retain(|channel_id| !delete_channels.contains(channel_id));
1099
1100 for channel_id in &delete_channels {
1101 let channel_id = *channel_id;
1102 if payload
1103 .channels
1104 .iter()
1105 .any(|channel| channel.id == channel_id.0)
1106 {
1107 continue;
1108 }
1109 if let Some(OpenEntityHandle::Open(buffer)) =
1110 self.opened_buffers.remove(&channel_id)
1111 && let Some(buffer) = buffer.upgrade()
1112 {
1113 buffer.update(cx, ChannelBuffer::disconnect);
1114 }
1115 }
1116 }
1117
1118 let mut index = self.channel_index.bulk_insert();
1119 for channel in payload.channels {
1120 let id = ChannelId(channel.id);
1121 let channel_changed = index.insert(channel);
1122
1123 if channel_changed
1124 && let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id)
1125 && let Some(buffer) = buffer.upgrade()
1126 {
1127 buffer.update(cx, ChannelBuffer::channel_changed);
1128 }
1129 }
1130
1131 for latest_buffer_version in payload.latest_channel_buffer_versions {
1132 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1133 self.channel_states
1134 .entry(ChannelId(latest_buffer_version.channel_id))
1135 .or_default()
1136 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1137 }
1138
1139 self.channels_loaded.0.try_send(true).log_err();
1140 }
1141
1142 cx.notify();
1143 if payload.channel_participants.is_empty() {
1144 return None;
1145 }
1146
1147 let mut all_user_ids = Vec::new();
1148 let channel_participants = payload.channel_participants;
1149 for entry in &channel_participants {
1150 for user_id in entry.participant_user_ids.iter() {
1151 if let Err(ix) = all_user_ids.binary_search(user_id) {
1152 all_user_ids.insert(ix, *user_id);
1153 }
1154 }
1155 }
1156
1157 let users = self
1158 .user_store
1159 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1160 Some(cx.spawn(async move |this, cx| {
1161 let users = users.await?;
1162
1163 this.update(cx, |this, cx| {
1164 for entry in &channel_participants {
1165 let mut participants: Vec<_> = entry
1166 .participant_user_ids
1167 .iter()
1168 .filter_map(|user_id| {
1169 users
1170 .binary_search_by_key(&user_id, |user| &user.id)
1171 .ok()
1172 .map(|ix| users[ix].clone())
1173 })
1174 .collect();
1175
1176 participants.sort_by_key(|u| u.id);
1177
1178 this.channel_participants
1179 .insert(ChannelId(entry.channel_id), participants);
1180 }
1181
1182 cx.notify();
1183 })
1184 }))
1185 }
1186}
1187
1188impl ChannelState {
1189 fn set_role(&mut self, role: ChannelRole) {
1190 self.role = Some(role);
1191 }
1192
1193 fn has_channel_buffer_changed(&self) -> bool {
1194 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1195 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1196 && self
1197 .latest_notes_version
1198 .version
1199 .changed_since(&self.observed_notes_version.version))
1200 }
1201
1202 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1203 if self.observed_notes_version.epoch == epoch {
1204 self.observed_notes_version.version.join(version);
1205 } else {
1206 self.observed_notes_version = NotesVersion {
1207 epoch,
1208 version: version.clone(),
1209 };
1210 }
1211 }
1212
1213 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1214 if self.latest_notes_version.epoch == epoch {
1215 self.latest_notes_version.version.join(version);
1216 } else {
1217 self.latest_notes_version = NotesVersion {
1218 epoch,
1219 version: version.clone(),
1220 };
1221 }
1222 }
1223}