1mod channel_index;
2
3use crate::channel_buffer::ChannelBuffer;
4use anyhow::{Context as _, Result, anyhow};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{HashMap, HashSet};
8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
11 WeakEntity,
12};
13use language::Capability;
14use postage::{sink::Sink, watch};
15use rpc::{
16 TypedEnvelope,
17 proto::{self, ChannelRole, ChannelVisibility},
18};
19use settings::Settings;
20use std::{mem, sync::Arc, time::Duration};
21use util::{ResultExt, maybe};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
26 let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36pub struct ChannelStore {
37 pub channel_index: ChannelIndex,
38 channel_invitations: Vec<Arc<Channel>>,
39 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
40 channel_states: HashMap<ChannelId, ChannelState>,
41 outgoing_invites: HashSet<(ChannelId, UserId)>,
42 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
43 opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
44 client: Arc<Client>,
45 did_subscribe: bool,
46 channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
47 user_store: Entity<UserStore>,
48 _rpc_subscriptions: [Subscription; 2],
49 _watch_connection_status: Task<Option<()>>,
50 disconnect_channel_buffers_task: Option<Task<()>>,
51 _update_channels: Task<()>,
52}
53
54#[derive(Clone, Debug)]
55pub struct Channel {
56 pub id: ChannelId,
57 pub name: SharedString,
58 pub visibility: proto::ChannelVisibility,
59 pub parent_path: Vec<ChannelId>,
60 pub channel_order: i32,
61}
62
63#[derive(Default, Debug)]
64pub struct ChannelState {
65 latest_notes_version: NotesVersion,
66 observed_notes_version: NotesVersion,
67 role: Option<ChannelRole>,
68}
69
70impl Channel {
71 pub fn link(&self, cx: &App) -> String {
72 format!(
73 "{}/channel/{}-{}",
74 ClientSettings::get_global(cx).server_url,
75 Self::slug(&self.name),
76 self.id
77 )
78 }
79
80 pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
81 self.link(cx)
82 + "/notes"
83 + &heading
84 .map(|h| format!("#{}", Self::slug(&h)))
85 .unwrap_or_default()
86 }
87
88 pub fn is_root_channel(&self) -> bool {
89 self.parent_path.is_empty()
90 }
91
92 pub fn root_id(&self) -> ChannelId {
93 self.parent_path.first().copied().unwrap_or(self.id)
94 }
95
96 pub fn slug(str: &str) -> String {
97 let slug: String = str
98 .chars()
99 .map(|c| if c.is_alphanumeric() { c } else { '-' })
100 .collect();
101
102 slug.trim_matches(|c| c == '-').to_string()
103 }
104}
105
106#[derive(Debug)]
107pub struct ChannelMembership {
108 pub user: Arc<User>,
109 pub kind: proto::channel_member::Kind,
110 pub role: proto::ChannelRole,
111}
112impl ChannelMembership {
113 pub fn sort_key(&self) -> MembershipSortKey<'_> {
114 MembershipSortKey {
115 role_order: match self.role {
116 proto::ChannelRole::Admin => 0,
117 proto::ChannelRole::Member => 1,
118 proto::ChannelRole::Banned => 2,
119 proto::ChannelRole::Talker => 3,
120 proto::ChannelRole::Guest => 4,
121 },
122 kind_order: match self.kind {
123 proto::channel_member::Kind::Member => 0,
124 proto::channel_member::Kind::Invitee => 1,
125 },
126 username_order: &self.user.github_login,
127 }
128 }
129}
130
131#[derive(PartialOrd, Ord, PartialEq, Eq)]
132pub struct MembershipSortKey<'a> {
133 role_order: u8,
134 kind_order: u8,
135 username_order: &'a str,
136}
137
138pub enum ChannelEvent {
139 ChannelCreated(ChannelId),
140 ChannelRenamed(ChannelId),
141}
142
143impl EventEmitter<ChannelEvent> for ChannelStore {}
144
145enum OpenEntityHandle<E> {
146 Open(WeakEntity<E>),
147 Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
148}
149
150struct GlobalChannelStore(Entity<ChannelStore>);
151
152impl Global for GlobalChannelStore {}
153
154impl ChannelStore {
155 pub fn global(cx: &App) -> Entity<Self> {
156 cx.global::<GlobalChannelStore>().0.clone()
157 }
158
159 pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
160 let rpc_subscriptions = [
161 client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
162 client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
163 ];
164
165 let mut connection_status = client.status();
166 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
167 let watch_connection_status = cx.spawn(async move |this, cx| {
168 while let Some(status) = connection_status.next().await {
169 let this = this.upgrade()?;
170 match status {
171 client::Status::Connected { .. } => {
172 this.update(cx, |this, cx| this.handle_connect(cx))
173 .ok()?
174 .await
175 .log_err()?;
176 }
177 client::Status::SignedOut | client::Status::UpgradeRequired => {
178 this.update(cx, |this, cx| this.handle_disconnect(false, cx))
179 .ok();
180 }
181 _ => {
182 this.update(cx, |this, cx| this.handle_disconnect(true, cx))
183 .ok();
184 }
185 }
186 }
187 Some(())
188 });
189
190 Self {
191 channel_invitations: Vec::default(),
192 channel_index: ChannelIndex::default(),
193 channel_participants: Default::default(),
194 outgoing_invites: Default::default(),
195 opened_buffers: Default::default(),
196 update_channels_tx,
197 client,
198 user_store,
199 _rpc_subscriptions: rpc_subscriptions,
200 _watch_connection_status: watch_connection_status,
201 disconnect_channel_buffers_task: None,
202 _update_channels: cx.spawn(async move |this, cx| {
203 maybe!(async move {
204 while let Some(update_channels) = update_channels_rx.next().await {
205 if let Some(this) = this.upgrade() {
206 let update_task = this
207 .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
208 if let Some(update_task) = update_task {
209 update_task.await.log_err();
210 }
211 }
212 }
213 anyhow::Ok(())
214 })
215 .await
216 .log_err();
217 }),
218 channel_states: Default::default(),
219 did_subscribe: false,
220 channels_loaded: watch::channel_with(false),
221 }
222 }
223
224 pub fn initialize(&mut self) {
225 if !self.did_subscribe
226 && self
227 .client
228 .send(proto::SubscribeToChannels {})
229 .log_err()
230 .is_some()
231 {
232 self.did_subscribe = true;
233 }
234 }
235
236 pub fn wait_for_channels(
237 &mut self,
238 timeout: Duration,
239 cx: &mut Context<Self>,
240 ) -> Task<Result<()>> {
241 let mut channels_loaded_rx = self.channels_loaded.1.clone();
242 if *channels_loaded_rx.borrow() {
243 return Task::ready(Ok(()));
244 }
245
246 let mut status_receiver = self.client.status();
247 if status_receiver.borrow().is_connected() {
248 self.initialize();
249 }
250
251 let mut timer = cx.background_executor().timer(timeout).fuse();
252 cx.spawn(async move |this, cx| {
253 loop {
254 futures::select_biased! {
255 channels_loaded = channels_loaded_rx.next().fuse() => {
256 if let Some(true) = channels_loaded {
257 return Ok(());
258 }
259 }
260 status = status_receiver.next().fuse() => {
261 if let Some(status) = status
262 && status.is_connected() {
263 this.update(cx, |this, _cx| {
264 this.initialize();
265 }).ok();
266 }
267 continue;
268 }
269 _ = timer => {
270 return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
271 }
272 }
273 }
274 })
275 }
276
277 pub fn client(&self) -> Arc<Client> {
278 self.client.clone()
279 }
280
281 /// Returns the number of unique channels in the store
282 pub fn channel_count(&self) -> usize {
283 self.channel_index.by_id().len()
284 }
285
286 /// Returns the index of a channel ID in the list of unique channels
287 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
288 self.channel_index
289 .by_id()
290 .keys()
291 .position(|id| *id == channel_id)
292 }
293
294 /// Returns an iterator over all unique channels
295 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
296 self.channel_index.by_id().values()
297 }
298
299 /// Iterate over all entries in the channel DAG
300 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
301 self.channel_index
302 .ordered_channels()
303 .iter()
304 .filter_map(move |id| {
305 let channel = self.channel_index.by_id().get(id)?;
306 Some((channel.parent_path.len(), channel))
307 })
308 }
309
310 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
311 let channel_id = self.channel_index.ordered_channels().get(ix)?;
312 self.channel_index.by_id().get(channel_id)
313 }
314
315 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
316 self.channel_index.by_id().values().nth(ix)
317 }
318
319 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
320 self.channel_invitations
321 .iter()
322 .any(|channel| channel.id == channel_id)
323 }
324
325 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
326 &self.channel_invitations
327 }
328
329 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
330 self.channel_index.by_id().get(&channel_id)
331 }
332
333 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
334 if let Some(buffer) = self.opened_buffers.get(&channel_id)
335 && let OpenEntityHandle::Open(buffer) = buffer
336 {
337 return buffer.upgrade().is_some();
338 }
339 false
340 }
341
342 pub fn open_channel_buffer(
343 &mut self,
344 channel_id: ChannelId,
345 cx: &mut Context<Self>,
346 ) -> Task<Result<Entity<ChannelBuffer>>> {
347 let client = self.client.clone();
348 let user_store = self.user_store.clone();
349 let channel_store = cx.entity();
350 self.open_channel_resource(
351 channel_id,
352 "notes",
353 |this| &mut this.opened_buffers,
354 async move |channel, cx| {
355 ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
356 },
357 cx,
358 )
359 }
360
361 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
362 self.channel_states
363 .get(&channel_id)
364 .is_some_and(|state| state.has_channel_buffer_changed())
365 }
366
367 pub fn acknowledge_notes_version(
368 &mut self,
369 channel_id: ChannelId,
370 epoch: u64,
371 version: &clock::Global,
372 cx: &mut Context<Self>,
373 ) {
374 self.channel_states
375 .entry(channel_id)
376 .or_default()
377 .acknowledge_notes_version(epoch, version);
378 cx.notify()
379 }
380
381 pub fn update_latest_notes_version(
382 &mut self,
383 channel_id: ChannelId,
384 epoch: u64,
385 version: &clock::Global,
386 cx: &mut Context<Self>,
387 ) {
388 self.channel_states
389 .entry(channel_id)
390 .or_default()
391 .update_latest_notes_version(epoch, version);
392 cx.notify()
393 }
394
395 /// Asynchronously open a given resource associated with a channel.
396 ///
397 /// Make sure that the resource is only opened once, even if this method
398 /// is called multiple times with the same channel id while the first task
399 /// is still running.
400 fn open_channel_resource<T, F>(
401 &mut self,
402 channel_id: ChannelId,
403 resource_name: &'static str,
404 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
405 load: F,
406 cx: &mut Context<Self>,
407 ) -> Task<Result<Entity<T>>>
408 where
409 F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
410 T: 'static,
411 {
412 let task = loop {
413 match get_map(self).get(&channel_id) {
414 Some(OpenEntityHandle::Open(entity)) => {
415 if let Some(entity) = entity.upgrade() {
416 break Task::ready(Ok(entity)).shared();
417 } else {
418 get_map(self).remove(&channel_id);
419 continue;
420 }
421 }
422 Some(OpenEntityHandle::Loading(task)) => break task.clone(),
423 None => {}
424 }
425
426 let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
427 let task = cx
428 .spawn(async move |this, cx| {
429 channels_ready.await?;
430 let channel = this.read_with(cx, |this, _| {
431 this.channel_for_id(channel_id)
432 .cloned()
433 .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
434 })??;
435
436 load(channel, cx).await.map_err(Arc::new)
437 })
438 .shared();
439
440 get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
441 let task = cx.spawn({
442 async move |this, cx| {
443 let result = task.await;
444 this.update(cx, |this, _| match &result {
445 Ok(model) => {
446 get_map(this)
447 .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
448 }
449 Err(_) => {
450 get_map(this).remove(&channel_id);
451 }
452 })?;
453 result
454 }
455 });
456 break task.shared();
457 };
458 cx.background_spawn(async move {
459 task.await.map_err(|error| {
460 anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
461 })
462 })
463 }
464
465 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
466 self.channel_role(channel_id) == proto::ChannelRole::Admin
467 }
468
469 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
470 self.channel_index
471 .by_id()
472 .get(&channel_id)
473 .is_some_and(|channel| channel.is_root_channel())
474 }
475
476 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
477 self.channel_index
478 .by_id()
479 .get(&channel_id)
480 .is_some_and(|channel| channel.visibility == ChannelVisibility::Public)
481 }
482
483 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
484 match self.channel_role(channel_id) {
485 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
486 _ => Capability::ReadOnly,
487 }
488 }
489
490 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
491 maybe!({
492 let mut channel = self.channel_for_id(channel_id)?;
493 if !channel.is_root_channel() {
494 channel = self.channel_for_id(channel.root_id())?;
495 }
496 let root_channel_state = self.channel_states.get(&channel.id);
497 root_channel_state?.role
498 })
499 .unwrap_or(proto::ChannelRole::Guest)
500 }
501
502 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
503 self.channel_participants
504 .get(&channel_id)
505 .map_or(&[], |v| v.as_slice())
506 }
507
508 pub fn create_channel(
509 &self,
510 name: &str,
511 parent_id: Option<ChannelId>,
512 cx: &mut Context<Self>,
513 ) -> Task<Result<ChannelId>> {
514 let client = self.client.clone();
515 let name = name.trim_start_matches('#').to_owned();
516 cx.spawn(async move |this, cx| {
517 let response = client
518 .request(proto::CreateChannel {
519 name,
520 parent_id: parent_id.map(|cid| cid.0),
521 })
522 .await?;
523
524 let channel = response.channel.context("missing channel in response")?;
525 let channel_id = ChannelId(channel.id);
526
527 this.update(cx, |this, cx| {
528 let task = this.update_channels(
529 proto::UpdateChannels {
530 channels: vec![channel],
531 ..Default::default()
532 },
533 cx,
534 );
535 assert!(task.is_none());
536
537 // This event is emitted because the collab panel wants to clear the pending edit state
538 // before this frame is rendered. But we can't guarantee that the collab panel's future
539 // will resolve before this flush_effects finishes. Synchronously emitting this event
540 // ensures that the collab panel will observe this creation before the frame completes
541 cx.emit(ChannelEvent::ChannelCreated(channel_id));
542 })?;
543
544 Ok(channel_id)
545 })
546 }
547
548 pub fn move_channel(
549 &mut self,
550 channel_id: ChannelId,
551 to: ChannelId,
552 cx: &mut Context<Self>,
553 ) -> Task<Result<()>> {
554 let client = self.client.clone();
555 cx.spawn(async move |_, _| {
556 let _ = client
557 .request(proto::MoveChannel {
558 channel_id: channel_id.0,
559 to: to.0,
560 })
561 .await?;
562 Ok(())
563 })
564 }
565
566 pub fn reorder_channel(
567 &mut self,
568 channel_id: ChannelId,
569 direction: proto::reorder_channel::Direction,
570 cx: &mut Context<Self>,
571 ) -> Task<Result<()>> {
572 let client = self.client.clone();
573 cx.spawn(async move |_, _| {
574 client
575 .request(proto::ReorderChannel {
576 channel_id: channel_id.0,
577 direction: direction.into(),
578 })
579 .await?;
580 Ok(())
581 })
582 }
583
584 pub fn set_channel_visibility(
585 &mut self,
586 channel_id: ChannelId,
587 visibility: ChannelVisibility,
588 cx: &mut Context<Self>,
589 ) -> Task<Result<()>> {
590 let client = self.client.clone();
591 cx.spawn(async move |_, _| {
592 let _ = client
593 .request(proto::SetChannelVisibility {
594 channel_id: channel_id.0,
595 visibility: visibility.into(),
596 })
597 .await?;
598
599 Ok(())
600 })
601 }
602
603 pub fn invite_member(
604 &mut self,
605 channel_id: ChannelId,
606 user_id: UserId,
607 role: proto::ChannelRole,
608 cx: &mut Context<Self>,
609 ) -> Task<Result<()>> {
610 if !self.outgoing_invites.insert((channel_id, user_id)) {
611 return Task::ready(Err(anyhow!("invite request already in progress")));
612 }
613
614 cx.notify();
615 let client = self.client.clone();
616 cx.spawn(async move |this, cx| {
617 let result = client
618 .request(proto::InviteChannelMember {
619 channel_id: channel_id.0,
620 user_id,
621 role: role.into(),
622 })
623 .await;
624
625 this.update(cx, |this, cx| {
626 this.outgoing_invites.remove(&(channel_id, user_id));
627 cx.notify();
628 })?;
629
630 result?;
631
632 Ok(())
633 })
634 }
635
636 pub fn remove_member(
637 &mut self,
638 channel_id: ChannelId,
639 user_id: u64,
640 cx: &mut Context<Self>,
641 ) -> Task<Result<()>> {
642 if !self.outgoing_invites.insert((channel_id, user_id)) {
643 return Task::ready(Err(anyhow!("invite request already in progress")));
644 }
645
646 cx.notify();
647 let client = self.client.clone();
648 cx.spawn(async move |this, cx| {
649 let result = client
650 .request(proto::RemoveChannelMember {
651 channel_id: channel_id.0,
652 user_id,
653 })
654 .await;
655
656 this.update(cx, |this, cx| {
657 this.outgoing_invites.remove(&(channel_id, user_id));
658 cx.notify();
659 })?;
660 result?;
661 Ok(())
662 })
663 }
664
665 pub fn set_member_role(
666 &mut self,
667 channel_id: ChannelId,
668 user_id: UserId,
669 role: proto::ChannelRole,
670 cx: &mut Context<Self>,
671 ) -> Task<Result<()>> {
672 if !self.outgoing_invites.insert((channel_id, user_id)) {
673 return Task::ready(Err(anyhow!("member request already in progress")));
674 }
675
676 cx.notify();
677 let client = self.client.clone();
678 cx.spawn(async move |this, cx| {
679 let result = client
680 .request(proto::SetChannelMemberRole {
681 channel_id: channel_id.0,
682 user_id,
683 role: role.into(),
684 })
685 .await;
686
687 this.update(cx, |this, cx| {
688 this.outgoing_invites.remove(&(channel_id, user_id));
689 cx.notify();
690 })?;
691
692 result?;
693 Ok(())
694 })
695 }
696
697 pub fn rename(
698 &mut self,
699 channel_id: ChannelId,
700 new_name: &str,
701 cx: &mut Context<Self>,
702 ) -> Task<Result<()>> {
703 let client = self.client.clone();
704 let name = new_name.to_string();
705 cx.spawn(async move |this, cx| {
706 let channel = client
707 .request(proto::RenameChannel {
708 channel_id: channel_id.0,
709 name,
710 })
711 .await?
712 .channel
713 .context("missing channel in response")?;
714 this.update(cx, |this, cx| {
715 let task = this.update_channels(
716 proto::UpdateChannels {
717 channels: vec![channel],
718 ..Default::default()
719 },
720 cx,
721 );
722 assert!(task.is_none());
723
724 // This event is emitted because the collab panel wants to clear the pending edit state
725 // before this frame is rendered. But we can't guarantee that the collab panel's future
726 // will resolve before this flush_effects finishes. Synchronously emitting this event
727 // ensures that the collab panel will observe this creation before the frame complete
728 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
729 })?;
730 Ok(())
731 })
732 }
733
734 pub fn respond_to_channel_invite(
735 &mut self,
736 channel_id: ChannelId,
737 accept: bool,
738 cx: &mut Context<Self>,
739 ) -> Task<Result<()>> {
740 let client = self.client.clone();
741 cx.background_spawn(async move {
742 client
743 .request(proto::RespondToChannelInvite {
744 channel_id: channel_id.0,
745 accept,
746 })
747 .await?;
748 Ok(())
749 })
750 }
751 pub fn fuzzy_search_members(
752 &self,
753 channel_id: ChannelId,
754 query: String,
755 limit: u16,
756 cx: &mut Context<Self>,
757 ) -> Task<Result<Vec<ChannelMembership>>> {
758 let client = self.client.clone();
759 let user_store = self.user_store.downgrade();
760 cx.spawn(async move |_, cx| {
761 let response = client
762 .request(proto::GetChannelMembers {
763 channel_id: channel_id.0,
764 query,
765 limit: limit as u64,
766 })
767 .await?;
768 user_store.update(cx, |user_store, _| {
769 user_store.insert(response.users);
770 response
771 .members
772 .into_iter()
773 .filter_map(|member| {
774 Some(ChannelMembership {
775 user: user_store.get_cached_user(member.user_id)?,
776 role: member.role(),
777 kind: member.kind(),
778 })
779 })
780 .collect()
781 })
782 })
783 }
784
785 pub fn remove_channel(
786 &self,
787 channel_id: ChannelId,
788 ) -> impl Future<Output = Result<()>> + use<> {
789 let client = self.client.clone();
790 async move {
791 client
792 .request(proto::DeleteChannel {
793 channel_id: channel_id.0,
794 })
795 .await?;
796 Ok(())
797 }
798 }
799
800 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
801 false
802 }
803
804 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
805 self.outgoing_invites.contains(&(channel_id, user_id))
806 }
807
808 async fn handle_update_channels(
809 this: Entity<Self>,
810 message: TypedEnvelope<proto::UpdateChannels>,
811 cx: AsyncApp,
812 ) -> Result<()> {
813 this.read_with(&cx, |this, _| {
814 this.update_channels_tx
815 .unbounded_send(message.payload)
816 .unwrap();
817 })?;
818 Ok(())
819 }
820
821 async fn handle_update_user_channels(
822 this: Entity<Self>,
823 message: TypedEnvelope<proto::UpdateUserChannels>,
824 mut cx: AsyncApp,
825 ) -> Result<()> {
826 this.update(&mut cx, |this, cx| {
827 for buffer_version in message.payload.observed_channel_buffer_version {
828 let version = language::proto::deserialize_version(&buffer_version.version);
829 this.acknowledge_notes_version(
830 ChannelId(buffer_version.channel_id),
831 buffer_version.epoch,
832 &version,
833 cx,
834 );
835 }
836 for membership in message.payload.channel_memberships {
837 if let Some(role) = ChannelRole::from_i32(membership.role) {
838 this.channel_states
839 .entry(ChannelId(membership.channel_id))
840 .or_default()
841 .set_role(role)
842 }
843 }
844 })
845 }
846
847 fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
848 self.channel_index.clear();
849 self.channel_invitations.clear();
850 self.channel_participants.clear();
851 self.channel_index.clear();
852 self.outgoing_invites.clear();
853 self.disconnect_channel_buffers_task.take();
854
855 let mut buffer_versions = Vec::new();
856 for buffer in self.opened_buffers.values() {
857 if let OpenEntityHandle::Open(buffer) = buffer
858 && let Some(buffer) = buffer.upgrade()
859 {
860 let channel_buffer = buffer.read(cx);
861 let buffer = channel_buffer.buffer().read(cx);
862 buffer_versions.push(proto::ChannelBufferVersion {
863 channel_id: channel_buffer.channel_id.0,
864 epoch: channel_buffer.epoch(),
865 version: language::proto::serialize_version(&buffer.version()),
866 });
867 }
868 }
869
870 if buffer_versions.is_empty() {
871 return Task::ready(Ok(()));
872 }
873
874 let response = self.client.request(proto::RejoinChannelBuffers {
875 buffers: buffer_versions,
876 });
877
878 cx.spawn(async move |this, cx| {
879 let mut response = response.await?;
880
881 this.update(cx, |this, cx| {
882 this.opened_buffers.retain(|_, buffer| match buffer {
883 OpenEntityHandle::Open(channel_buffer) => {
884 let Some(channel_buffer) = channel_buffer.upgrade() else {
885 return false;
886 };
887
888 channel_buffer.update(cx, |channel_buffer, cx| {
889 let channel_id = channel_buffer.channel_id;
890 if let Some(remote_buffer) = response
891 .buffers
892 .iter_mut()
893 .find(|buffer| buffer.channel_id == channel_id.0)
894 {
895 let channel_id = channel_buffer.channel_id;
896 let remote_version =
897 language::proto::deserialize_version(&remote_buffer.version);
898
899 channel_buffer.replace_collaborators(
900 mem::take(&mut remote_buffer.collaborators),
901 cx,
902 );
903
904 let operations = channel_buffer
905 .buffer()
906 .update(cx, |buffer, cx| {
907 let outgoing_operations =
908 buffer.serialize_ops(Some(remote_version), cx);
909 let incoming_operations =
910 mem::take(&mut remote_buffer.operations)
911 .into_iter()
912 .map(language::proto::deserialize_operation)
913 .collect::<Result<Vec<_>>>()?;
914 buffer.apply_ops(incoming_operations, cx);
915 anyhow::Ok(outgoing_operations)
916 })
917 .log_err();
918
919 if let Some(operations) = operations {
920 channel_buffer.connected(cx);
921 let client = this.client.clone();
922 cx.background_spawn(async move {
923 let operations = operations.await;
924 for chunk in language::proto::split_operations(operations) {
925 client
926 .send(proto::UpdateChannelBuffer {
927 channel_id: channel_id.0,
928 operations: chunk,
929 })
930 .ok();
931 }
932 })
933 .detach();
934 return true;
935 }
936 }
937
938 channel_buffer.disconnect(cx);
939 false
940 })
941 }
942 OpenEntityHandle::Loading(_) => true,
943 });
944 })
945 .ok();
946 anyhow::Ok(())
947 })
948 }
949
950 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
951 cx.notify();
952 self.did_subscribe = false;
953 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
954 cx.spawn(async move |this, cx| {
955 if wait_for_reconnect {
956 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
957 }
958
959 if let Some(this) = this.upgrade() {
960 this.update(cx, |this, cx| {
961 for buffer in this.opened_buffers.values() {
962 if let OpenEntityHandle::Open(buffer) = &buffer
963 && let Some(buffer) = buffer.upgrade()
964 {
965 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
966 }
967 }
968 })
969 .ok();
970 }
971 })
972 });
973 }
974
975 #[cfg(any(test, feature = "test-support"))]
976 pub fn reset(&mut self) {
977 self.channel_invitations.clear();
978 self.channel_index.clear();
979 self.channel_participants.clear();
980 self.outgoing_invites.clear();
981 self.opened_buffers.clear();
982 self.disconnect_channel_buffers_task = None;
983 self.channel_states.clear();
984 }
985
986 pub(crate) fn update_channels(
987 &mut self,
988 payload: proto::UpdateChannels,
989 cx: &mut Context<ChannelStore>,
990 ) -> Option<Task<Result<()>>> {
991 if !payload.remove_channel_invitations.is_empty() {
992 self.channel_invitations
993 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
994 }
995 for channel in payload.channel_invitations {
996 match self
997 .channel_invitations
998 .binary_search_by_key(&channel.id, |c| c.id.0)
999 {
1000 Ok(ix) => {
1001 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1002 }
1003 Err(ix) => self.channel_invitations.insert(
1004 ix,
1005 Arc::new(Channel {
1006 id: ChannelId(channel.id),
1007 visibility: channel.visibility(),
1008 name: channel.name.into(),
1009 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1010 channel_order: channel.channel_order,
1011 }),
1012 ),
1013 }
1014 }
1015
1016 let channels_changed = !payload.channels.is_empty()
1017 || !payload.delete_channels.is_empty()
1018 || !payload.latest_channel_buffer_versions.is_empty();
1019
1020 if channels_changed {
1021 if !payload.delete_channels.is_empty() {
1022 let delete_channels: Vec<ChannelId> =
1023 payload.delete_channels.into_iter().map(ChannelId).collect();
1024 self.channel_index.delete_channels(&delete_channels);
1025 self.channel_participants
1026 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1027
1028 for channel_id in &delete_channels {
1029 let channel_id = *channel_id;
1030 if payload
1031 .channels
1032 .iter()
1033 .any(|channel| channel.id == channel_id.0)
1034 {
1035 continue;
1036 }
1037 if let Some(OpenEntityHandle::Open(buffer)) =
1038 self.opened_buffers.remove(&channel_id)
1039 && let Some(buffer) = buffer.upgrade()
1040 {
1041 buffer.update(cx, ChannelBuffer::disconnect);
1042 }
1043 }
1044 }
1045
1046 let mut index = self.channel_index.bulk_insert();
1047 for channel in payload.channels {
1048 let id = ChannelId(channel.id);
1049 let channel_changed = index.insert(channel);
1050
1051 if channel_changed
1052 && let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id)
1053 && let Some(buffer) = buffer.upgrade()
1054 {
1055 buffer.update(cx, ChannelBuffer::channel_changed);
1056 }
1057 }
1058
1059 for latest_buffer_version in payload.latest_channel_buffer_versions {
1060 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1061 self.channel_states
1062 .entry(ChannelId(latest_buffer_version.channel_id))
1063 .or_default()
1064 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1065 }
1066
1067 self.channels_loaded.0.try_send(true).log_err();
1068 }
1069
1070 cx.notify();
1071 if payload.channel_participants.is_empty() {
1072 return None;
1073 }
1074
1075 let mut all_user_ids = Vec::new();
1076 let channel_participants = payload.channel_participants;
1077 for entry in &channel_participants {
1078 for user_id in entry.participant_user_ids.iter() {
1079 if let Err(ix) = all_user_ids.binary_search(user_id) {
1080 all_user_ids.insert(ix, *user_id);
1081 }
1082 }
1083 }
1084
1085 let users = self
1086 .user_store
1087 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1088 Some(cx.spawn(async move |this, cx| {
1089 let users = users.await?;
1090
1091 this.update(cx, |this, cx| {
1092 for entry in &channel_participants {
1093 let mut participants: Vec<_> = entry
1094 .participant_user_ids
1095 .iter()
1096 .filter_map(|user_id| {
1097 users
1098 .binary_search_by_key(&user_id, |user| &user.id)
1099 .ok()
1100 .map(|ix| users[ix].clone())
1101 })
1102 .collect();
1103
1104 participants.sort_by_key(|u| u.id);
1105
1106 this.channel_participants
1107 .insert(ChannelId(entry.channel_id), participants);
1108 }
1109
1110 cx.notify();
1111 })
1112 }))
1113 }
1114}
1115
1116impl ChannelState {
1117 fn set_role(&mut self, role: ChannelRole) {
1118 self.role = Some(role);
1119 }
1120
1121 fn has_channel_buffer_changed(&self) -> bool {
1122 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1123 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1124 && self
1125 .latest_notes_version
1126 .version
1127 .changed_since(&self.observed_notes_version.version))
1128 }
1129
1130 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1131 if self.observed_notes_version.epoch == epoch {
1132 self.observed_notes_version.version.join(version);
1133 } else {
1134 self.observed_notes_version = NotesVersion {
1135 epoch,
1136 version: version.clone(),
1137 };
1138 }
1139 }
1140
1141 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1142 if self.latest_notes_version.epoch == epoch {
1143 self.latest_notes_version.version.join(version);
1144 } else {
1145 self.latest_notes_version = NotesVersion {
1146 epoch,
1147 version: version.clone(),
1148 };
1149 }
1150 }
1151}