1mod channel_index;
2
3use crate::channel_buffer::ChannelBuffer;
4use anyhow::{Context as _, Result, anyhow};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{HashMap, HashSet};
8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
11 WeakEntity,
12};
13use language::Capability;
14use postage::{sink::Sink, watch};
15use rpc::{
16 TypedEnvelope,
17 proto::{self, ChannelRole, ChannelVisibility},
18};
19use settings::Settings;
20use std::{mem, sync::Arc, time::Duration};
21use util::{ResultExt, maybe};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
26 let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36pub struct ChannelStore {
37 pub channel_index: ChannelIndex,
38 channel_invitations: Vec<Arc<Channel>>,
39 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
40 channel_states: HashMap<ChannelId, ChannelState>,
41 outgoing_invites: HashSet<(ChannelId, UserId)>,
42 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
43 opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
44 client: Arc<Client>,
45 did_subscribe: bool,
46 channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
47 user_store: Entity<UserStore>,
48 _rpc_subscriptions: [Subscription; 2],
49 _watch_connection_status: Task<Option<()>>,
50 disconnect_channel_buffers_task: Option<Task<()>>,
51 _update_channels: Task<()>,
52}
53
54#[derive(Clone, Debug)]
55pub struct Channel {
56 pub id: ChannelId,
57 pub name: SharedString,
58 pub visibility: proto::ChannelVisibility,
59 pub parent_path: Vec<ChannelId>,
60 pub channel_order: i32,
61}
62
63#[derive(Default, Debug)]
64pub struct ChannelState {
65 latest_notes_version: NotesVersion,
66 observed_notes_version: NotesVersion,
67 role: Option<ChannelRole>,
68}
69
70impl Channel {
71 pub fn link(&self, cx: &App) -> String {
72 format!(
73 "{}/channel/{}-{}",
74 ClientSettings::get_global(cx).server_url,
75 Self::slug(&self.name),
76 self.id
77 )
78 }
79
80 pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
81 self.link(cx)
82 + "/notes"
83 + &heading
84 .map(|h| format!("#{}", Self::slug(&h)))
85 .unwrap_or_default()
86 }
87
88 pub fn is_root_channel(&self) -> bool {
89 self.parent_path.is_empty()
90 }
91
92 pub fn root_id(&self) -> ChannelId {
93 self.parent_path.first().copied().unwrap_or(self.id)
94 }
95
96 pub fn slug(str: &str) -> String {
97 let slug: String = str
98 .chars()
99 .map(|c| if c.is_alphanumeric() { c } else { '-' })
100 .collect();
101
102 slug.trim_matches(|c| c == '-').to_string()
103 }
104}
105
106#[derive(Debug)]
107pub struct ChannelMembership {
108 pub user: Arc<User>,
109 pub kind: proto::channel_member::Kind,
110 pub role: proto::ChannelRole,
111}
112impl ChannelMembership {
113 pub fn sort_key(&self) -> MembershipSortKey<'_> {
114 MembershipSortKey {
115 role_order: match self.role {
116 proto::ChannelRole::Admin => 0,
117 proto::ChannelRole::Member => 1,
118 proto::ChannelRole::Banned => 2,
119 proto::ChannelRole::Talker => 3,
120 proto::ChannelRole::Guest => 4,
121 },
122 kind_order: match self.kind {
123 proto::channel_member::Kind::Member => 0,
124 proto::channel_member::Kind::Invitee => 1,
125 },
126 username_order: &self.user.github_login,
127 }
128 }
129}
130
131#[derive(PartialOrd, Ord, PartialEq, Eq)]
132pub struct MembershipSortKey<'a> {
133 role_order: u8,
134 kind_order: u8,
135 username_order: &'a str,
136}
137
138pub enum ChannelEvent {
139 ChannelCreated(ChannelId),
140 ChannelRenamed(ChannelId),
141}
142
143impl EventEmitter<ChannelEvent> for ChannelStore {}
144
145enum OpenEntityHandle<E> {
146 Open(WeakEntity<E>),
147 Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
148}
149
150struct GlobalChannelStore(Entity<ChannelStore>);
151
152impl Global for GlobalChannelStore {}
153
154impl ChannelStore {
155 pub fn global(cx: &App) -> Entity<Self> {
156 cx.global::<GlobalChannelStore>().0.clone()
157 }
158
159 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
160 cx.try_global::<GlobalChannelStore>().map(|g| g.0.clone())
161 }
162
163 pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
164 let rpc_subscriptions = [
165 client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
166 client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
167 ];
168
169 let mut connection_status = client.status();
170 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
171 let watch_connection_status = cx.spawn(async move |this, cx| {
172 while let Some(status) = connection_status.next().await {
173 let this = this.upgrade()?;
174 match status {
175 client::Status::Connected { .. } => {
176 this.update(cx, |this, cx| this.handle_connect(cx))
177 .await
178 .log_err()?;
179 }
180 client::Status::SignedOut | client::Status::UpgradeRequired => {
181 this.update(cx, |this, cx| this.handle_disconnect(false, cx));
182 }
183 _ => {
184 this.update(cx, |this, cx| this.handle_disconnect(true, cx));
185 }
186 }
187 }
188 Some(())
189 });
190
191 Self {
192 channel_invitations: Vec::default(),
193 channel_index: ChannelIndex::default(),
194 channel_participants: Default::default(),
195 outgoing_invites: Default::default(),
196 opened_buffers: Default::default(),
197 update_channels_tx,
198 client,
199 user_store,
200 _rpc_subscriptions: rpc_subscriptions,
201 _watch_connection_status: watch_connection_status,
202 disconnect_channel_buffers_task: None,
203 _update_channels: cx.spawn(async move |this, cx| {
204 maybe!(async move {
205 while let Some(update_channels) = update_channels_rx.next().await {
206 if let Some(this) = this.upgrade() {
207 let update_task = this
208 .update(cx, |this, cx| this.update_channels(update_channels, cx));
209 if let Some(update_task) = update_task {
210 update_task.await.log_err();
211 }
212 }
213 }
214 anyhow::Ok(())
215 })
216 .await
217 .log_err();
218 }),
219 channel_states: Default::default(),
220 did_subscribe: false,
221 channels_loaded: watch::channel_with(false),
222 }
223 }
224
225 pub fn initialize(&mut self) {
226 if !self.did_subscribe
227 && self
228 .client
229 .send(proto::SubscribeToChannels {})
230 .log_err()
231 .is_some()
232 {
233 self.did_subscribe = true;
234 }
235 }
236
237 pub fn wait_for_channels(
238 &mut self,
239 timeout: Duration,
240 cx: &mut Context<Self>,
241 ) -> Task<Result<()>> {
242 let mut channels_loaded_rx = self.channels_loaded.1.clone();
243 if *channels_loaded_rx.borrow() {
244 return Task::ready(Ok(()));
245 }
246
247 let mut status_receiver = self.client.status();
248 if status_receiver.borrow().is_connected() {
249 self.initialize();
250 }
251
252 let mut timer = cx.background_executor().timer(timeout).fuse();
253 cx.spawn(async move |this, cx| {
254 loop {
255 futures::select_biased! {
256 channels_loaded = channels_loaded_rx.next().fuse() => {
257 if let Some(true) = channels_loaded {
258 return Ok(());
259 }
260 }
261 status = status_receiver.next().fuse() => {
262 if let Some(status) = status
263 && status.is_connected() {
264 this.update(cx, |this, _cx| {
265 this.initialize();
266 }).ok();
267 }
268 continue;
269 }
270 _ = timer => {
271 return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
272 }
273 }
274 }
275 })
276 }
277
278 pub fn client(&self) -> Arc<Client> {
279 self.client.clone()
280 }
281
282 /// Returns the number of unique channels in the store
283 pub fn channel_count(&self) -> usize {
284 self.channel_index.by_id().len()
285 }
286
287 /// Returns the index of a channel ID in the list of unique channels
288 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
289 self.channel_index
290 .by_id()
291 .keys()
292 .position(|id| *id == channel_id)
293 }
294
295 /// Returns an iterator over all unique channels
296 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
297 self.channel_index.by_id().values()
298 }
299
300 /// Iterate over all entries in the channel DAG
301 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
302 self.channel_index
303 .ordered_channels()
304 .iter()
305 .filter_map(move |id| {
306 let channel = self.channel_index.by_id().get(id)?;
307 Some((channel.parent_path.len(), channel))
308 })
309 }
310
311 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
312 let channel_id = self.channel_index.ordered_channels().get(ix)?;
313 self.channel_index.by_id().get(channel_id)
314 }
315
316 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
317 self.channel_index.by_id().values().nth(ix)
318 }
319
320 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
321 self.channel_invitations
322 .iter()
323 .any(|channel| channel.id == channel_id)
324 }
325
326 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
327 &self.channel_invitations
328 }
329
330 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
331 self.channel_index.by_id().get(&channel_id)
332 }
333
334 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
335 if let Some(buffer) = self.opened_buffers.get(&channel_id)
336 && let OpenEntityHandle::Open(buffer) = buffer
337 {
338 return buffer.upgrade().is_some();
339 }
340 false
341 }
342
343 pub fn open_channel_buffer(
344 &mut self,
345 channel_id: ChannelId,
346 cx: &mut Context<Self>,
347 ) -> Task<Result<Entity<ChannelBuffer>>> {
348 let client = self.client.clone();
349 let user_store = self.user_store.clone();
350 let channel_store = cx.entity();
351 self.open_channel_resource(
352 channel_id,
353 "notes",
354 |this| &mut this.opened_buffers,
355 async move |channel, cx| {
356 ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
357 },
358 cx,
359 )
360 }
361
362 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
363 self.channel_states
364 .get(&channel_id)
365 .is_some_and(|state| state.has_channel_buffer_changed())
366 }
367
368 pub fn acknowledge_notes_version(
369 &mut self,
370 channel_id: ChannelId,
371 epoch: u64,
372 version: &clock::Global,
373 cx: &mut Context<Self>,
374 ) {
375 self.channel_states
376 .entry(channel_id)
377 .or_default()
378 .acknowledge_notes_version(epoch, version);
379 cx.notify()
380 }
381
382 pub fn update_latest_notes_version(
383 &mut self,
384 channel_id: ChannelId,
385 epoch: u64,
386 version: &clock::Global,
387 cx: &mut Context<Self>,
388 ) {
389 self.channel_states
390 .entry(channel_id)
391 .or_default()
392 .update_latest_notes_version(epoch, version);
393 cx.notify()
394 }
395
396 /// Asynchronously open a given resource associated with a channel.
397 ///
398 /// Make sure that the resource is only opened once, even if this method
399 /// is called multiple times with the same channel id while the first task
400 /// is still running.
401 fn open_channel_resource<T, F>(
402 &mut self,
403 channel_id: ChannelId,
404 resource_name: &'static str,
405 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
406 load: F,
407 cx: &mut Context<Self>,
408 ) -> Task<Result<Entity<T>>>
409 where
410 F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
411 T: 'static,
412 {
413 let task = loop {
414 match get_map(self).get(&channel_id) {
415 Some(OpenEntityHandle::Open(entity)) => {
416 if let Some(entity) = entity.upgrade() {
417 break Task::ready(Ok(entity)).shared();
418 } else {
419 get_map(self).remove(&channel_id);
420 continue;
421 }
422 }
423 Some(OpenEntityHandle::Loading(task)) => break task.clone(),
424 None => {}
425 }
426
427 let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
428 let task = cx
429 .spawn(async move |this, cx| {
430 channels_ready.await?;
431 let channel = this.read_with(cx, |this, _| {
432 this.channel_for_id(channel_id)
433 .cloned()
434 .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
435 })??;
436
437 load(channel, cx).await.map_err(Arc::new)
438 })
439 .shared();
440
441 get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
442 let task = cx.spawn({
443 async move |this, cx| {
444 let result = task.await;
445 this.update(cx, |this, _| match &result {
446 Ok(model) => {
447 get_map(this)
448 .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
449 }
450 Err(_) => {
451 get_map(this).remove(&channel_id);
452 }
453 })?;
454 result
455 }
456 });
457 break task.shared();
458 };
459 cx.background_spawn(async move {
460 task.await.map_err(|error| {
461 anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
462 })
463 })
464 }
465
466 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
467 self.channel_role(channel_id) == proto::ChannelRole::Admin
468 }
469
470 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
471 self.channel_index
472 .by_id()
473 .get(&channel_id)
474 .is_some_and(|channel| channel.is_root_channel())
475 }
476
477 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
478 self.channel_index
479 .by_id()
480 .get(&channel_id)
481 .is_some_and(|channel| channel.visibility == ChannelVisibility::Public)
482 }
483
484 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
485 match self.channel_role(channel_id) {
486 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
487 _ => Capability::ReadOnly,
488 }
489 }
490
491 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
492 maybe!({
493 let mut channel = self.channel_for_id(channel_id)?;
494 if !channel.is_root_channel() {
495 channel = self.channel_for_id(channel.root_id())?;
496 }
497 let root_channel_state = self.channel_states.get(&channel.id);
498 root_channel_state?.role
499 })
500 .unwrap_or(proto::ChannelRole::Guest)
501 }
502
503 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
504 self.channel_participants
505 .get(&channel_id)
506 .map_or(&[], |v| v.as_slice())
507 }
508
509 pub fn create_channel(
510 &self,
511 name: &str,
512 parent_id: Option<ChannelId>,
513 cx: &mut Context<Self>,
514 ) -> Task<Result<ChannelId>> {
515 let client = self.client.clone();
516 let name = name.trim_start_matches('#').to_owned();
517 cx.spawn(async move |this, cx| {
518 let response = client
519 .request(proto::CreateChannel {
520 name,
521 parent_id: parent_id.map(|cid| cid.0),
522 })
523 .await?;
524
525 let channel = response.channel.context("missing channel in response")?;
526 let channel_id = ChannelId(channel.id);
527
528 this.update(cx, |this, cx| {
529 let task = this.update_channels(
530 proto::UpdateChannels {
531 channels: vec![channel],
532 ..Default::default()
533 },
534 cx,
535 );
536 assert!(task.is_none());
537
538 // This event is emitted because the collab panel wants to clear the pending edit state
539 // before this frame is rendered. But we can't guarantee that the collab panel's future
540 // will resolve before this flush_effects finishes. Synchronously emitting this event
541 // ensures that the collab panel will observe this creation before the frame completes
542 cx.emit(ChannelEvent::ChannelCreated(channel_id));
543 })?;
544
545 Ok(channel_id)
546 })
547 }
548
549 pub fn move_channel(
550 &mut self,
551 channel_id: ChannelId,
552 to: ChannelId,
553 cx: &mut Context<Self>,
554 ) -> Task<Result<()>> {
555 let client = self.client.clone();
556 cx.spawn(async move |_, _| {
557 let _ = client
558 .request(proto::MoveChannel {
559 channel_id: channel_id.0,
560 to: to.0,
561 })
562 .await?;
563 Ok(())
564 })
565 }
566
567 pub fn reorder_channel(
568 &mut self,
569 channel_id: ChannelId,
570 direction: proto::reorder_channel::Direction,
571 cx: &mut Context<Self>,
572 ) -> Task<Result<()>> {
573 let client = self.client.clone();
574 cx.spawn(async move |_, _| {
575 client
576 .request(proto::ReorderChannel {
577 channel_id: channel_id.0,
578 direction: direction.into(),
579 })
580 .await?;
581 Ok(())
582 })
583 }
584
585 pub fn set_channel_visibility(
586 &mut self,
587 channel_id: ChannelId,
588 visibility: ChannelVisibility,
589 cx: &mut Context<Self>,
590 ) -> Task<Result<()>> {
591 let client = self.client.clone();
592 cx.spawn(async move |_, _| {
593 let _ = client
594 .request(proto::SetChannelVisibility {
595 channel_id: channel_id.0,
596 visibility: visibility.into(),
597 })
598 .await?;
599
600 Ok(())
601 })
602 }
603
604 pub fn invite_member(
605 &mut self,
606 channel_id: ChannelId,
607 user_id: UserId,
608 role: proto::ChannelRole,
609 cx: &mut Context<Self>,
610 ) -> Task<Result<()>> {
611 if !self.outgoing_invites.insert((channel_id, user_id)) {
612 return Task::ready(Err(anyhow!("invite request already in progress")));
613 }
614
615 cx.notify();
616 let client = self.client.clone();
617 cx.spawn(async move |this, cx| {
618 let result = client
619 .request(proto::InviteChannelMember {
620 channel_id: channel_id.0,
621 user_id,
622 role: role.into(),
623 })
624 .await;
625
626 this.update(cx, |this, cx| {
627 this.outgoing_invites.remove(&(channel_id, user_id));
628 cx.notify();
629 })?;
630
631 result?;
632
633 Ok(())
634 })
635 }
636
637 pub fn remove_member(
638 &mut self,
639 channel_id: ChannelId,
640 user_id: u64,
641 cx: &mut Context<Self>,
642 ) -> Task<Result<()>> {
643 if !self.outgoing_invites.insert((channel_id, user_id)) {
644 return Task::ready(Err(anyhow!("invite request already in progress")));
645 }
646
647 cx.notify();
648 let client = self.client.clone();
649 cx.spawn(async move |this, cx| {
650 let result = client
651 .request(proto::RemoveChannelMember {
652 channel_id: channel_id.0,
653 user_id,
654 })
655 .await;
656
657 this.update(cx, |this, cx| {
658 this.outgoing_invites.remove(&(channel_id, user_id));
659 cx.notify();
660 })?;
661 result?;
662 Ok(())
663 })
664 }
665
666 pub fn set_member_role(
667 &mut self,
668 channel_id: ChannelId,
669 user_id: UserId,
670 role: proto::ChannelRole,
671 cx: &mut Context<Self>,
672 ) -> Task<Result<()>> {
673 if !self.outgoing_invites.insert((channel_id, user_id)) {
674 return Task::ready(Err(anyhow!("member request already in progress")));
675 }
676
677 cx.notify();
678 let client = self.client.clone();
679 cx.spawn(async move |this, cx| {
680 let result = client
681 .request(proto::SetChannelMemberRole {
682 channel_id: channel_id.0,
683 user_id,
684 role: role.into(),
685 })
686 .await;
687
688 this.update(cx, |this, cx| {
689 this.outgoing_invites.remove(&(channel_id, user_id));
690 cx.notify();
691 })?;
692
693 result?;
694 Ok(())
695 })
696 }
697
698 pub fn rename(
699 &mut self,
700 channel_id: ChannelId,
701 new_name: &str,
702 cx: &mut Context<Self>,
703 ) -> Task<Result<()>> {
704 let client = self.client.clone();
705 let name = new_name.to_string();
706 cx.spawn(async move |this, cx| {
707 let channel = client
708 .request(proto::RenameChannel {
709 channel_id: channel_id.0,
710 name,
711 })
712 .await?
713 .channel
714 .context("missing channel in response")?;
715 this.update(cx, |this, cx| {
716 let task = this.update_channels(
717 proto::UpdateChannels {
718 channels: vec![channel],
719 ..Default::default()
720 },
721 cx,
722 );
723 assert!(task.is_none());
724
725 // This event is emitted because the collab panel wants to clear the pending edit state
726 // before this frame is rendered. But we can't guarantee that the collab panel's future
727 // will resolve before this flush_effects finishes. Synchronously emitting this event
728 // ensures that the collab panel will observe this creation before the frame complete
729 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
730 })?;
731 Ok(())
732 })
733 }
734
735 pub fn respond_to_channel_invite(
736 &mut self,
737 channel_id: ChannelId,
738 accept: bool,
739 cx: &mut Context<Self>,
740 ) -> Task<Result<()>> {
741 let client = self.client.clone();
742 cx.background_spawn(async move {
743 client
744 .request(proto::RespondToChannelInvite {
745 channel_id: channel_id.0,
746 accept,
747 })
748 .await?;
749 Ok(())
750 })
751 }
752 pub fn fuzzy_search_members(
753 &self,
754 channel_id: ChannelId,
755 query: String,
756 limit: u16,
757 cx: &mut Context<Self>,
758 ) -> Task<Result<Vec<ChannelMembership>>> {
759 let client = self.client.clone();
760 let user_store = self.user_store.downgrade();
761 cx.spawn(async move |_, cx| {
762 let response = client
763 .request(proto::GetChannelMembers {
764 channel_id: channel_id.0,
765 query,
766 limit: limit as u64,
767 })
768 .await?;
769 user_store.update(cx, |user_store, _| {
770 user_store.insert(response.users);
771 response
772 .members
773 .into_iter()
774 .filter_map(|member| {
775 Some(ChannelMembership {
776 user: user_store.get_cached_user(member.user_id)?,
777 role: member.role(),
778 kind: member.kind(),
779 })
780 })
781 .collect()
782 })
783 })
784 }
785
786 pub fn remove_channel(
787 &self,
788 channel_id: ChannelId,
789 ) -> impl Future<Output = Result<()>> + use<> {
790 let client = self.client.clone();
791 async move {
792 client
793 .request(proto::DeleteChannel {
794 channel_id: channel_id.0,
795 })
796 .await?;
797 Ok(())
798 }
799 }
800
801 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
802 false
803 }
804
805 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
806 self.outgoing_invites.contains(&(channel_id, user_id))
807 }
808
809 async fn handle_update_channels(
810 this: Entity<Self>,
811 message: TypedEnvelope<proto::UpdateChannels>,
812 cx: AsyncApp,
813 ) -> Result<()> {
814 this.read_with(&cx, |this, _| {
815 this.update_channels_tx
816 .unbounded_send(message.payload)
817 .unwrap();
818 });
819 Ok(())
820 }
821
822 async fn handle_update_user_channels(
823 this: Entity<Self>,
824 message: TypedEnvelope<proto::UpdateUserChannels>,
825 mut cx: AsyncApp,
826 ) -> Result<()> {
827 this.update(&mut cx, |this, cx| {
828 for buffer_version in message.payload.observed_channel_buffer_version {
829 let version = language::proto::deserialize_version(&buffer_version.version);
830 this.acknowledge_notes_version(
831 ChannelId(buffer_version.channel_id),
832 buffer_version.epoch,
833 &version,
834 cx,
835 );
836 }
837 for membership in message.payload.channel_memberships {
838 if let Some(role) = ChannelRole::from_i32(membership.role) {
839 this.channel_states
840 .entry(ChannelId(membership.channel_id))
841 .or_default()
842 .set_role(role)
843 }
844 }
845 });
846 Ok(())
847 }
848
849 fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
850 self.channel_index.clear();
851 self.channel_invitations.clear();
852 self.channel_participants.clear();
853 self.channel_index.clear();
854 self.outgoing_invites.clear();
855 self.disconnect_channel_buffers_task.take();
856
857 let mut buffer_versions = Vec::new();
858 for buffer in self.opened_buffers.values() {
859 if let OpenEntityHandle::Open(buffer) = buffer
860 && let Some(buffer) = buffer.upgrade()
861 {
862 buffer.update(cx, |channel_buffer, cx| {
863 // Block on_buffer_update from sending UpdateChannelBuffer messages
864 // until the rejoin completes. This prevents a race condition where
865 // edits made during the rejoin async gap could inflate the server
866 // version, causing offline edits to be filtered out by serialize_ops.
867 channel_buffer.set_rejoining(true);
868 let inner_buffer = channel_buffer.buffer().read(cx);
869 buffer_versions.push(proto::ChannelBufferVersion {
870 channel_id: channel_buffer.channel_id.0,
871 epoch: channel_buffer.epoch(),
872 version: language::proto::serialize_version(&inner_buffer.version()),
873 });
874 });
875 }
876 }
877
878 if buffer_versions.is_empty() {
879 return Task::ready(Ok(()));
880 }
881
882 let response = self.client.request(proto::RejoinChannelBuffers {
883 buffers: buffer_versions,
884 });
885
886 cx.spawn(async move |this, cx| {
887 let response = match response.await {
888 Ok(response) => response,
889 Err(err) => {
890 // Clear rejoining flag on all buffers since the rejoin failed
891 this.update(cx, |this, cx| {
892 for buffer in this.opened_buffers.values() {
893 if let OpenEntityHandle::Open(buffer) = buffer {
894 if let Some(buffer) = buffer.upgrade() {
895 buffer.update(cx, |channel_buffer, _| {
896 channel_buffer.set_rejoining(false);
897 });
898 }
899 }
900 }
901 })
902 .ok();
903 return Err(err);
904 }
905 };
906 let mut response = response;
907
908 this.update(cx, |this, cx| {
909 this.opened_buffers.retain(|_, buffer| match buffer {
910 OpenEntityHandle::Open(channel_buffer) => {
911 let Some(channel_buffer) = channel_buffer.upgrade() else {
912 return false;
913 };
914
915 channel_buffer.update(cx, |channel_buffer, cx| {
916 let channel_id = channel_buffer.channel_id;
917 if let Some(remote_buffer) = response
918 .buffers
919 .iter_mut()
920 .find(|buffer| buffer.channel_id == channel_id.0)
921 {
922 let channel_id = channel_buffer.channel_id;
923 let remote_version =
924 language::proto::deserialize_version(&remote_buffer.version);
925
926 channel_buffer.replace_collaborators(
927 mem::take(&mut remote_buffer.collaborators),
928 cx,
929 );
930
931 let operations = channel_buffer
932 .buffer()
933 .update(cx, |buffer, cx| {
934 let outgoing_operations =
935 buffer.serialize_ops(Some(remote_version), cx);
936 let incoming_operations =
937 mem::take(&mut remote_buffer.operations)
938 .into_iter()
939 .map(language::proto::deserialize_operation)
940 .collect::<Result<Vec<_>>>()?;
941 buffer.apply_ops(incoming_operations, cx);
942 anyhow::Ok(outgoing_operations)
943 })
944 .log_err();
945
946 if let Some(operations) = operations {
947 channel_buffer.connected(cx);
948 let client = this.client.clone();
949 cx.background_spawn(async move {
950 let operations = operations.await;
951 for chunk in language::proto::split_operations(operations) {
952 client
953 .send(proto::UpdateChannelBuffer {
954 channel_id: channel_id.0,
955 operations: chunk,
956 })
957 .ok();
958 }
959 })
960 .detach();
961 return true;
962 }
963 }
964
965 channel_buffer.disconnect(cx);
966 false
967 })
968 }
969 OpenEntityHandle::Loading(_) => true,
970 });
971 })
972 .ok();
973 anyhow::Ok(())
974 })
975 }
976
977 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
978 cx.notify();
979 self.did_subscribe = false;
980
981 // If we're waiting for reconnect, set rejoining=true on all buffers immediately.
982 // This prevents operations from being sent during the reconnection window,
983 // before handle_connect has a chance to run and capture the version.
984 if wait_for_reconnect {
985 for buffer in self.opened_buffers.values() {
986 if let OpenEntityHandle::Open(buffer) = buffer {
987 if let Some(buffer) = buffer.upgrade() {
988 buffer.update(cx, |channel_buffer, _| {
989 channel_buffer.set_rejoining(true);
990 });
991 }
992 }
993 }
994 }
995
996 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
997 cx.spawn(async move |this, cx| {
998 if wait_for_reconnect {
999 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1000 }
1001
1002 if let Some(this) = this.upgrade() {
1003 this.update(cx, |this, cx| {
1004 for buffer in this.opened_buffers.values() {
1005 if let OpenEntityHandle::Open(buffer) = &buffer
1006 && let Some(buffer) = buffer.upgrade()
1007 {
1008 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1009 }
1010 }
1011 });
1012 }
1013 })
1014 });
1015 }
1016
1017 #[cfg(any(test, feature = "test-support"))]
1018 pub fn reset(&mut self) {
1019 self.channel_invitations.clear();
1020 self.channel_index.clear();
1021 self.channel_participants.clear();
1022 self.outgoing_invites.clear();
1023 self.opened_buffers.clear();
1024 self.disconnect_channel_buffers_task = None;
1025 self.channel_states.clear();
1026 }
1027
1028 pub(crate) fn update_channels(
1029 &mut self,
1030 payload: proto::UpdateChannels,
1031 cx: &mut Context<ChannelStore>,
1032 ) -> Option<Task<Result<()>>> {
1033 if !payload.remove_channel_invitations.is_empty() {
1034 self.channel_invitations
1035 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1036 }
1037 for channel in payload.channel_invitations {
1038 match self
1039 .channel_invitations
1040 .binary_search_by_key(&channel.id, |c| c.id.0)
1041 {
1042 Ok(ix) => {
1043 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1044 }
1045 Err(ix) => self.channel_invitations.insert(
1046 ix,
1047 Arc::new(Channel {
1048 id: ChannelId(channel.id),
1049 visibility: channel.visibility(),
1050 name: channel.name.into(),
1051 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1052 channel_order: channel.channel_order,
1053 }),
1054 ),
1055 }
1056 }
1057
1058 let channels_changed = !payload.channels.is_empty()
1059 || !payload.delete_channels.is_empty()
1060 || !payload.latest_channel_buffer_versions.is_empty();
1061
1062 if channels_changed {
1063 if !payload.delete_channels.is_empty() {
1064 let delete_channels: Vec<ChannelId> =
1065 payload.delete_channels.into_iter().map(ChannelId).collect();
1066 self.channel_index.delete_channels(&delete_channels);
1067 self.channel_participants
1068 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1069
1070 for channel_id in &delete_channels {
1071 let channel_id = *channel_id;
1072 if payload
1073 .channels
1074 .iter()
1075 .any(|channel| channel.id == channel_id.0)
1076 {
1077 continue;
1078 }
1079 if let Some(OpenEntityHandle::Open(buffer)) =
1080 self.opened_buffers.remove(&channel_id)
1081 && let Some(buffer) = buffer.upgrade()
1082 {
1083 buffer.update(cx, ChannelBuffer::disconnect);
1084 }
1085 }
1086 }
1087
1088 let mut index = self.channel_index.bulk_insert();
1089 for channel in payload.channels {
1090 let id = ChannelId(channel.id);
1091 let channel_changed = index.insert(channel);
1092
1093 if channel_changed
1094 && let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id)
1095 && let Some(buffer) = buffer.upgrade()
1096 {
1097 buffer.update(cx, ChannelBuffer::channel_changed);
1098 }
1099 }
1100
1101 for latest_buffer_version in payload.latest_channel_buffer_versions {
1102 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1103 self.channel_states
1104 .entry(ChannelId(latest_buffer_version.channel_id))
1105 .or_default()
1106 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1107 }
1108
1109 self.channels_loaded.0.try_send(true).log_err();
1110 }
1111
1112 cx.notify();
1113 if payload.channel_participants.is_empty() {
1114 return None;
1115 }
1116
1117 let mut all_user_ids = Vec::new();
1118 let channel_participants = payload.channel_participants;
1119 for entry in &channel_participants {
1120 for user_id in entry.participant_user_ids.iter() {
1121 if let Err(ix) = all_user_ids.binary_search(user_id) {
1122 all_user_ids.insert(ix, *user_id);
1123 }
1124 }
1125 }
1126
1127 let users = self
1128 .user_store
1129 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1130 Some(cx.spawn(async move |this, cx| {
1131 let users = users.await?;
1132
1133 this.update(cx, |this, cx| {
1134 for entry in &channel_participants {
1135 let mut participants: Vec<_> = entry
1136 .participant_user_ids
1137 .iter()
1138 .filter_map(|user_id| {
1139 users
1140 .binary_search_by_key(&user_id, |user| &user.id)
1141 .ok()
1142 .map(|ix| users[ix].clone())
1143 })
1144 .collect();
1145
1146 participants.sort_by_key(|u| u.id);
1147
1148 this.channel_participants
1149 .insert(ChannelId(entry.channel_id), participants);
1150 }
1151
1152 cx.notify();
1153 })
1154 }))
1155 }
1156}
1157
1158impl ChannelState {
1159 fn set_role(&mut self, role: ChannelRole) {
1160 self.role = Some(role);
1161 }
1162
1163 fn has_channel_buffer_changed(&self) -> bool {
1164 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1165 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1166 && self
1167 .latest_notes_version
1168 .version
1169 .changed_since(&self.observed_notes_version.version))
1170 }
1171
1172 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1173 if self.observed_notes_version.epoch == epoch {
1174 self.observed_notes_version.version.join(version);
1175 } else {
1176 self.observed_notes_version = NotesVersion {
1177 epoch,
1178 version: version.clone(),
1179 };
1180 }
1181 }
1182
1183 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1184 if self.latest_notes_version.epoch == epoch {
1185 self.latest_notes_version.version.join(version);
1186 } else {
1187 self.latest_notes_version = NotesVersion {
1188 epoch,
1189 version: version.clone(),
1190 };
1191 }
1192 }
1193}