1mod channel_index;
2
3use crate::channel_buffer::ChannelBuffer;
4use anyhow::{Context as _, Result, anyhow};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{HashMap, HashSet};
8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
11 WeakEntity,
12};
13use language::Capability;
14use postage::{sink::Sink, watch};
15use rpc::{
16 TypedEnvelope,
17 proto::{self, ChannelRole, ChannelVisibility},
18};
19use settings::Settings;
20use std::{mem, sync::Arc, time::Duration};
21use util::{ResultExt, maybe};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
26 let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36pub struct ChannelStore {
37 pub channel_index: ChannelIndex,
38 channel_invitations: Vec<Arc<Channel>>,
39 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
40 channel_states: HashMap<ChannelId, ChannelState>,
41 outgoing_invites: HashSet<(ChannelId, UserId)>,
42 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
43 opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
44 client: Arc<Client>,
45 did_subscribe: bool,
46 channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
47 user_store: Entity<UserStore>,
48 _rpc_subscriptions: [Subscription; 2],
49 _watch_connection_status: Task<Option<()>>,
50 disconnect_channel_buffers_task: Option<Task<()>>,
51 _update_channels: Task<()>,
52}
53
54#[derive(Clone, Debug)]
55pub struct Channel {
56 pub id: ChannelId,
57 pub name: SharedString,
58 pub visibility: proto::ChannelVisibility,
59 pub parent_path: Vec<ChannelId>,
60 pub channel_order: i32,
61}
62
63#[derive(Default, Debug)]
64pub struct ChannelState {
65 latest_notes_version: NotesVersion,
66 observed_notes_version: NotesVersion,
67 role: Option<ChannelRole>,
68}
69
70impl Channel {
71 pub fn link(&self, cx: &App) -> String {
72 format!(
73 "{}/channel/{}-{}",
74 ClientSettings::get_global(cx).server_url,
75 Self::slug(&self.name),
76 self.id
77 )
78 }
79
80 pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
81 self.link(cx)
82 + "/notes"
83 + &heading
84 .map(|h| format!("#{}", Self::slug(&h)))
85 .unwrap_or_default()
86 }
87
88 pub fn is_root_channel(&self) -> bool {
89 self.parent_path.is_empty()
90 }
91
92 pub fn root_id(&self) -> ChannelId {
93 self.parent_path.first().copied().unwrap_or(self.id)
94 }
95
96 pub fn slug(str: &str) -> String {
97 let slug: String = str
98 .chars()
99 .map(|c| if c.is_alphanumeric() { c } else { '-' })
100 .collect();
101
102 slug.trim_matches(|c| c == '-').to_string()
103 }
104}
105
106#[derive(Debug)]
107pub struct ChannelMembership {
108 pub user: Arc<User>,
109 pub kind: proto::channel_member::Kind,
110 pub role: proto::ChannelRole,
111}
112impl ChannelMembership {
113 pub fn sort_key(&self) -> MembershipSortKey<'_> {
114 MembershipSortKey {
115 role_order: match self.role {
116 proto::ChannelRole::Admin => 0,
117 proto::ChannelRole::Member => 1,
118 proto::ChannelRole::Banned => 2,
119 proto::ChannelRole::Talker => 3,
120 proto::ChannelRole::Guest => 4,
121 },
122 kind_order: match self.kind {
123 proto::channel_member::Kind::Member => 0,
124 proto::channel_member::Kind::Invitee => 1,
125 },
126 username_order: &self.user.github_login,
127 }
128 }
129}
130
131#[derive(PartialOrd, Ord, PartialEq, Eq)]
132pub struct MembershipSortKey<'a> {
133 role_order: u8,
134 kind_order: u8,
135 username_order: &'a str,
136}
137
138pub enum ChannelEvent {
139 ChannelCreated(ChannelId),
140 ChannelRenamed(ChannelId),
141}
142
143impl EventEmitter<ChannelEvent> for ChannelStore {}
144
145enum OpenEntityHandle<E> {
146 Open(WeakEntity<E>),
147 Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
148}
149
150struct GlobalChannelStore(Entity<ChannelStore>);
151
152impl Global for GlobalChannelStore {}
153
154impl ChannelStore {
155 pub fn global(cx: &App) -> Entity<Self> {
156 cx.global::<GlobalChannelStore>().0.clone()
157 }
158
159 pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
160 let rpc_subscriptions = [
161 client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
162 client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
163 ];
164
165 let mut connection_status = client.status();
166 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
167 let watch_connection_status = cx.spawn(async move |this, cx| {
168 while let Some(status) = connection_status.next().await {
169 let this = this.upgrade()?;
170 match status {
171 client::Status::Connected { .. } => {
172 this.update(cx, |this, cx| this.handle_connect(cx))
173 .await
174 .log_err()?;
175 }
176 client::Status::SignedOut | client::Status::UpgradeRequired => {
177 this.update(cx, |this, cx| this.handle_disconnect(false, cx));
178 }
179 _ => {
180 this.update(cx, |this, cx| this.handle_disconnect(true, cx));
181 }
182 }
183 }
184 Some(())
185 });
186
187 Self {
188 channel_invitations: Vec::default(),
189 channel_index: ChannelIndex::default(),
190 channel_participants: Default::default(),
191 outgoing_invites: Default::default(),
192 opened_buffers: Default::default(),
193 update_channels_tx,
194 client,
195 user_store,
196 _rpc_subscriptions: rpc_subscriptions,
197 _watch_connection_status: watch_connection_status,
198 disconnect_channel_buffers_task: None,
199 _update_channels: cx.spawn(async move |this, cx| {
200 maybe!(async move {
201 while let Some(update_channels) = update_channels_rx.next().await {
202 if let Some(this) = this.upgrade() {
203 let update_task = this
204 .update(cx, |this, cx| this.update_channels(update_channels, cx));
205 if let Some(update_task) = update_task {
206 update_task.await.log_err();
207 }
208 }
209 }
210 anyhow::Ok(())
211 })
212 .await
213 .log_err();
214 }),
215 channel_states: Default::default(),
216 did_subscribe: false,
217 channels_loaded: watch::channel_with(false),
218 }
219 }
220
221 pub fn initialize(&mut self) {
222 if !self.did_subscribe
223 && self
224 .client
225 .send(proto::SubscribeToChannels {})
226 .log_err()
227 .is_some()
228 {
229 self.did_subscribe = true;
230 }
231 }
232
233 pub fn wait_for_channels(
234 &mut self,
235 timeout: Duration,
236 cx: &mut Context<Self>,
237 ) -> Task<Result<()>> {
238 let mut channels_loaded_rx = self.channels_loaded.1.clone();
239 if *channels_loaded_rx.borrow() {
240 return Task::ready(Ok(()));
241 }
242
243 let mut status_receiver = self.client.status();
244 if status_receiver.borrow().is_connected() {
245 self.initialize();
246 }
247
248 let mut timer = cx.background_executor().timer(timeout).fuse();
249 cx.spawn(async move |this, cx| {
250 loop {
251 futures::select_biased! {
252 channels_loaded = channels_loaded_rx.next().fuse() => {
253 if let Some(true) = channels_loaded {
254 return Ok(());
255 }
256 }
257 status = status_receiver.next().fuse() => {
258 if let Some(status) = status
259 && status.is_connected() {
260 this.update(cx, |this, _cx| {
261 this.initialize();
262 }).ok();
263 }
264 continue;
265 }
266 _ = timer => {
267 return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
268 }
269 }
270 }
271 })
272 }
273
274 pub fn client(&self) -> Arc<Client> {
275 self.client.clone()
276 }
277
278 /// Returns the number of unique channels in the store
279 pub fn channel_count(&self) -> usize {
280 self.channel_index.by_id().len()
281 }
282
283 /// Returns the index of a channel ID in the list of unique channels
284 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
285 self.channel_index
286 .by_id()
287 .keys()
288 .position(|id| *id == channel_id)
289 }
290
291 /// Returns an iterator over all unique channels
292 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
293 self.channel_index.by_id().values()
294 }
295
296 /// Iterate over all entries in the channel DAG
297 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
298 self.channel_index
299 .ordered_channels()
300 .iter()
301 .filter_map(move |id| {
302 let channel = self.channel_index.by_id().get(id)?;
303 Some((channel.parent_path.len(), channel))
304 })
305 }
306
307 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
308 let channel_id = self.channel_index.ordered_channels().get(ix)?;
309 self.channel_index.by_id().get(channel_id)
310 }
311
312 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
313 self.channel_index.by_id().values().nth(ix)
314 }
315
316 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
317 self.channel_invitations
318 .iter()
319 .any(|channel| channel.id == channel_id)
320 }
321
322 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
323 &self.channel_invitations
324 }
325
326 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
327 self.channel_index.by_id().get(&channel_id)
328 }
329
330 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
331 if let Some(buffer) = self.opened_buffers.get(&channel_id)
332 && let OpenEntityHandle::Open(buffer) = buffer
333 {
334 return buffer.upgrade().is_some();
335 }
336 false
337 }
338
339 pub fn open_channel_buffer(
340 &mut self,
341 channel_id: ChannelId,
342 cx: &mut Context<Self>,
343 ) -> Task<Result<Entity<ChannelBuffer>>> {
344 let client = self.client.clone();
345 let user_store = self.user_store.clone();
346 let channel_store = cx.entity();
347 self.open_channel_resource(
348 channel_id,
349 "notes",
350 |this| &mut this.opened_buffers,
351 async move |channel, cx| {
352 ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
353 },
354 cx,
355 )
356 }
357
358 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
359 self.channel_states
360 .get(&channel_id)
361 .is_some_and(|state| state.has_channel_buffer_changed())
362 }
363
364 pub fn acknowledge_notes_version(
365 &mut self,
366 channel_id: ChannelId,
367 epoch: u64,
368 version: &clock::Global,
369 cx: &mut Context<Self>,
370 ) {
371 self.channel_states
372 .entry(channel_id)
373 .or_default()
374 .acknowledge_notes_version(epoch, version);
375 cx.notify()
376 }
377
378 pub fn update_latest_notes_version(
379 &mut self,
380 channel_id: ChannelId,
381 epoch: u64,
382 version: &clock::Global,
383 cx: &mut Context<Self>,
384 ) {
385 self.channel_states
386 .entry(channel_id)
387 .or_default()
388 .update_latest_notes_version(epoch, version);
389 cx.notify()
390 }
391
392 /// Asynchronously open a given resource associated with a channel.
393 ///
394 /// Make sure that the resource is only opened once, even if this method
395 /// is called multiple times with the same channel id while the first task
396 /// is still running.
397 fn open_channel_resource<T, F>(
398 &mut self,
399 channel_id: ChannelId,
400 resource_name: &'static str,
401 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
402 load: F,
403 cx: &mut Context<Self>,
404 ) -> Task<Result<Entity<T>>>
405 where
406 F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
407 T: 'static,
408 {
409 let task = loop {
410 match get_map(self).get(&channel_id) {
411 Some(OpenEntityHandle::Open(entity)) => {
412 if let Some(entity) = entity.upgrade() {
413 break Task::ready(Ok(entity)).shared();
414 } else {
415 get_map(self).remove(&channel_id);
416 continue;
417 }
418 }
419 Some(OpenEntityHandle::Loading(task)) => break task.clone(),
420 None => {}
421 }
422
423 let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
424 let task = cx
425 .spawn(async move |this, cx| {
426 channels_ready.await?;
427 let channel = this.read_with(cx, |this, _| {
428 this.channel_for_id(channel_id)
429 .cloned()
430 .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
431 })??;
432
433 load(channel, cx).await.map_err(Arc::new)
434 })
435 .shared();
436
437 get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
438 let task = cx.spawn({
439 async move |this, cx| {
440 let result = task.await;
441 this.update(cx, |this, _| match &result {
442 Ok(model) => {
443 get_map(this)
444 .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
445 }
446 Err(_) => {
447 get_map(this).remove(&channel_id);
448 }
449 })?;
450 result
451 }
452 });
453 break task.shared();
454 };
455 cx.background_spawn(async move {
456 task.await.map_err(|error| {
457 anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
458 })
459 })
460 }
461
462 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
463 self.channel_role(channel_id) == proto::ChannelRole::Admin
464 }
465
466 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
467 self.channel_index
468 .by_id()
469 .get(&channel_id)
470 .is_some_and(|channel| channel.is_root_channel())
471 }
472
473 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
474 self.channel_index
475 .by_id()
476 .get(&channel_id)
477 .is_some_and(|channel| channel.visibility == ChannelVisibility::Public)
478 }
479
480 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
481 match self.channel_role(channel_id) {
482 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
483 _ => Capability::ReadOnly,
484 }
485 }
486
487 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
488 maybe!({
489 let mut channel = self.channel_for_id(channel_id)?;
490 if !channel.is_root_channel() {
491 channel = self.channel_for_id(channel.root_id())?;
492 }
493 let root_channel_state = self.channel_states.get(&channel.id);
494 root_channel_state?.role
495 })
496 .unwrap_or(proto::ChannelRole::Guest)
497 }
498
499 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
500 self.channel_participants
501 .get(&channel_id)
502 .map_or(&[], |v| v.as_slice())
503 }
504
505 pub fn create_channel(
506 &self,
507 name: &str,
508 parent_id: Option<ChannelId>,
509 cx: &mut Context<Self>,
510 ) -> Task<Result<ChannelId>> {
511 let client = self.client.clone();
512 let name = name.trim_start_matches('#').to_owned();
513 cx.spawn(async move |this, cx| {
514 let response = client
515 .request(proto::CreateChannel {
516 name,
517 parent_id: parent_id.map(|cid| cid.0),
518 })
519 .await?;
520
521 let channel = response.channel.context("missing channel in response")?;
522 let channel_id = ChannelId(channel.id);
523
524 this.update(cx, |this, cx| {
525 let task = this.update_channels(
526 proto::UpdateChannels {
527 channels: vec![channel],
528 ..Default::default()
529 },
530 cx,
531 );
532 assert!(task.is_none());
533
534 // This event is emitted because the collab panel wants to clear the pending edit state
535 // before this frame is rendered. But we can't guarantee that the collab panel's future
536 // will resolve before this flush_effects finishes. Synchronously emitting this event
537 // ensures that the collab panel will observe this creation before the frame completes
538 cx.emit(ChannelEvent::ChannelCreated(channel_id));
539 })?;
540
541 Ok(channel_id)
542 })
543 }
544
545 pub fn move_channel(
546 &mut self,
547 channel_id: ChannelId,
548 to: ChannelId,
549 cx: &mut Context<Self>,
550 ) -> Task<Result<()>> {
551 let client = self.client.clone();
552 cx.spawn(async move |_, _| {
553 let _ = client
554 .request(proto::MoveChannel {
555 channel_id: channel_id.0,
556 to: to.0,
557 })
558 .await?;
559 Ok(())
560 })
561 }
562
563 pub fn reorder_channel(
564 &mut self,
565 channel_id: ChannelId,
566 direction: proto::reorder_channel::Direction,
567 cx: &mut Context<Self>,
568 ) -> Task<Result<()>> {
569 let client = self.client.clone();
570 cx.spawn(async move |_, _| {
571 client
572 .request(proto::ReorderChannel {
573 channel_id: channel_id.0,
574 direction: direction.into(),
575 })
576 .await?;
577 Ok(())
578 })
579 }
580
581 pub fn set_channel_visibility(
582 &mut self,
583 channel_id: ChannelId,
584 visibility: ChannelVisibility,
585 cx: &mut Context<Self>,
586 ) -> Task<Result<()>> {
587 let client = self.client.clone();
588 cx.spawn(async move |_, _| {
589 let _ = client
590 .request(proto::SetChannelVisibility {
591 channel_id: channel_id.0,
592 visibility: visibility.into(),
593 })
594 .await?;
595
596 Ok(())
597 })
598 }
599
600 pub fn invite_member(
601 &mut self,
602 channel_id: ChannelId,
603 user_id: UserId,
604 role: proto::ChannelRole,
605 cx: &mut Context<Self>,
606 ) -> Task<Result<()>> {
607 if !self.outgoing_invites.insert((channel_id, user_id)) {
608 return Task::ready(Err(anyhow!("invite request already in progress")));
609 }
610
611 cx.notify();
612 let client = self.client.clone();
613 cx.spawn(async move |this, cx| {
614 let result = client
615 .request(proto::InviteChannelMember {
616 channel_id: channel_id.0,
617 user_id,
618 role: role.into(),
619 })
620 .await;
621
622 this.update(cx, |this, cx| {
623 this.outgoing_invites.remove(&(channel_id, user_id));
624 cx.notify();
625 })?;
626
627 result?;
628
629 Ok(())
630 })
631 }
632
633 pub fn remove_member(
634 &mut self,
635 channel_id: ChannelId,
636 user_id: u64,
637 cx: &mut Context<Self>,
638 ) -> Task<Result<()>> {
639 if !self.outgoing_invites.insert((channel_id, user_id)) {
640 return Task::ready(Err(anyhow!("invite request already in progress")));
641 }
642
643 cx.notify();
644 let client = self.client.clone();
645 cx.spawn(async move |this, cx| {
646 let result = client
647 .request(proto::RemoveChannelMember {
648 channel_id: channel_id.0,
649 user_id,
650 })
651 .await;
652
653 this.update(cx, |this, cx| {
654 this.outgoing_invites.remove(&(channel_id, user_id));
655 cx.notify();
656 })?;
657 result?;
658 Ok(())
659 })
660 }
661
662 pub fn set_member_role(
663 &mut self,
664 channel_id: ChannelId,
665 user_id: UserId,
666 role: proto::ChannelRole,
667 cx: &mut Context<Self>,
668 ) -> Task<Result<()>> {
669 if !self.outgoing_invites.insert((channel_id, user_id)) {
670 return Task::ready(Err(anyhow!("member request already in progress")));
671 }
672
673 cx.notify();
674 let client = self.client.clone();
675 cx.spawn(async move |this, cx| {
676 let result = client
677 .request(proto::SetChannelMemberRole {
678 channel_id: channel_id.0,
679 user_id,
680 role: role.into(),
681 })
682 .await;
683
684 this.update(cx, |this, cx| {
685 this.outgoing_invites.remove(&(channel_id, user_id));
686 cx.notify();
687 })?;
688
689 result?;
690 Ok(())
691 })
692 }
693
694 pub fn rename(
695 &mut self,
696 channel_id: ChannelId,
697 new_name: &str,
698 cx: &mut Context<Self>,
699 ) -> Task<Result<()>> {
700 let client = self.client.clone();
701 let name = new_name.to_string();
702 cx.spawn(async move |this, cx| {
703 let channel = client
704 .request(proto::RenameChannel {
705 channel_id: channel_id.0,
706 name,
707 })
708 .await?
709 .channel
710 .context("missing channel in response")?;
711 this.update(cx, |this, cx| {
712 let task = this.update_channels(
713 proto::UpdateChannels {
714 channels: vec![channel],
715 ..Default::default()
716 },
717 cx,
718 );
719 assert!(task.is_none());
720
721 // This event is emitted because the collab panel wants to clear the pending edit state
722 // before this frame is rendered. But we can't guarantee that the collab panel's future
723 // will resolve before this flush_effects finishes. Synchronously emitting this event
724 // ensures that the collab panel will observe this creation before the frame complete
725 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
726 })?;
727 Ok(())
728 })
729 }
730
731 pub fn respond_to_channel_invite(
732 &mut self,
733 channel_id: ChannelId,
734 accept: bool,
735 cx: &mut Context<Self>,
736 ) -> Task<Result<()>> {
737 let client = self.client.clone();
738 cx.background_spawn(async move {
739 client
740 .request(proto::RespondToChannelInvite {
741 channel_id: channel_id.0,
742 accept,
743 })
744 .await?;
745 Ok(())
746 })
747 }
748 pub fn fuzzy_search_members(
749 &self,
750 channel_id: ChannelId,
751 query: String,
752 limit: u16,
753 cx: &mut Context<Self>,
754 ) -> Task<Result<Vec<ChannelMembership>>> {
755 let client = self.client.clone();
756 let user_store = self.user_store.downgrade();
757 cx.spawn(async move |_, cx| {
758 let response = client
759 .request(proto::GetChannelMembers {
760 channel_id: channel_id.0,
761 query,
762 limit: limit as u64,
763 })
764 .await?;
765 user_store.update(cx, |user_store, _| {
766 user_store.insert(response.users);
767 response
768 .members
769 .into_iter()
770 .filter_map(|member| {
771 Some(ChannelMembership {
772 user: user_store.get_cached_user(member.user_id)?,
773 role: member.role(),
774 kind: member.kind(),
775 })
776 })
777 .collect()
778 })
779 })
780 }
781
782 pub fn remove_channel(
783 &self,
784 channel_id: ChannelId,
785 ) -> impl Future<Output = Result<()>> + use<> {
786 let client = self.client.clone();
787 async move {
788 client
789 .request(proto::DeleteChannel {
790 channel_id: channel_id.0,
791 })
792 .await?;
793 Ok(())
794 }
795 }
796
797 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
798 false
799 }
800
801 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
802 self.outgoing_invites.contains(&(channel_id, user_id))
803 }
804
805 async fn handle_update_channels(
806 this: Entity<Self>,
807 message: TypedEnvelope<proto::UpdateChannels>,
808 cx: AsyncApp,
809 ) -> Result<()> {
810 this.read_with(&cx, |this, _| {
811 this.update_channels_tx
812 .unbounded_send(message.payload)
813 .unwrap();
814 });
815 Ok(())
816 }
817
818 async fn handle_update_user_channels(
819 this: Entity<Self>,
820 message: TypedEnvelope<proto::UpdateUserChannels>,
821 mut cx: AsyncApp,
822 ) -> Result<()> {
823 this.update(&mut cx, |this, cx| {
824 for buffer_version in message.payload.observed_channel_buffer_version {
825 let version = language::proto::deserialize_version(&buffer_version.version);
826 this.acknowledge_notes_version(
827 ChannelId(buffer_version.channel_id),
828 buffer_version.epoch,
829 &version,
830 cx,
831 );
832 }
833 for membership in message.payload.channel_memberships {
834 if let Some(role) = ChannelRole::from_i32(membership.role) {
835 this.channel_states
836 .entry(ChannelId(membership.channel_id))
837 .or_default()
838 .set_role(role)
839 }
840 }
841 });
842 Ok(())
843 }
844
845 fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
846 self.channel_index.clear();
847 self.channel_invitations.clear();
848 self.channel_participants.clear();
849 self.channel_index.clear();
850 self.outgoing_invites.clear();
851 self.disconnect_channel_buffers_task.take();
852
853 let mut buffer_versions = Vec::new();
854 for buffer in self.opened_buffers.values() {
855 if let OpenEntityHandle::Open(buffer) = buffer
856 && let Some(buffer) = buffer.upgrade()
857 {
858 let channel_buffer = buffer.read(cx);
859 let buffer = channel_buffer.buffer().read(cx);
860 buffer_versions.push(proto::ChannelBufferVersion {
861 channel_id: channel_buffer.channel_id.0,
862 epoch: channel_buffer.epoch(),
863 version: language::proto::serialize_version(&buffer.version()),
864 });
865 }
866 }
867
868 if buffer_versions.is_empty() {
869 return Task::ready(Ok(()));
870 }
871
872 let response = self.client.request(proto::RejoinChannelBuffers {
873 buffers: buffer_versions,
874 });
875
876 cx.spawn(async move |this, cx| {
877 let mut response = response.await?;
878
879 this.update(cx, |this, cx| {
880 this.opened_buffers.retain(|_, buffer| match buffer {
881 OpenEntityHandle::Open(channel_buffer) => {
882 let Some(channel_buffer) = channel_buffer.upgrade() else {
883 return false;
884 };
885
886 channel_buffer.update(cx, |channel_buffer, cx| {
887 let channel_id = channel_buffer.channel_id;
888 if let Some(remote_buffer) = response
889 .buffers
890 .iter_mut()
891 .find(|buffer| buffer.channel_id == channel_id.0)
892 {
893 let channel_id = channel_buffer.channel_id;
894 let remote_version =
895 language::proto::deserialize_version(&remote_buffer.version);
896
897 channel_buffer.replace_collaborators(
898 mem::take(&mut remote_buffer.collaborators),
899 cx,
900 );
901
902 let operations = channel_buffer
903 .buffer()
904 .update(cx, |buffer, cx| {
905 let outgoing_operations =
906 buffer.serialize_ops(Some(remote_version), cx);
907 let incoming_operations =
908 mem::take(&mut remote_buffer.operations)
909 .into_iter()
910 .map(language::proto::deserialize_operation)
911 .collect::<Result<Vec<_>>>()?;
912 buffer.apply_ops(incoming_operations, cx);
913 anyhow::Ok(outgoing_operations)
914 })
915 .log_err();
916
917 if let Some(operations) = operations {
918 channel_buffer.connected(cx);
919 let client = this.client.clone();
920 cx.background_spawn(async move {
921 let operations = operations.await;
922 for chunk in language::proto::split_operations(operations) {
923 client
924 .send(proto::UpdateChannelBuffer {
925 channel_id: channel_id.0,
926 operations: chunk,
927 })
928 .ok();
929 }
930 })
931 .detach();
932 return true;
933 }
934 }
935
936 channel_buffer.disconnect(cx);
937 false
938 })
939 }
940 OpenEntityHandle::Loading(_) => true,
941 });
942 })
943 .ok();
944 anyhow::Ok(())
945 })
946 }
947
948 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
949 cx.notify();
950 self.did_subscribe = false;
951 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
952 cx.spawn(async move |this, cx| {
953 if wait_for_reconnect {
954 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
955 }
956
957 if let Some(this) = this.upgrade() {
958 this.update(cx, |this, cx| {
959 for buffer in this.opened_buffers.values() {
960 if let OpenEntityHandle::Open(buffer) = &buffer
961 && let Some(buffer) = buffer.upgrade()
962 {
963 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
964 }
965 }
966 });
967 }
968 })
969 });
970 }
971
972 #[cfg(any(test, feature = "test-support"))]
973 pub fn reset(&mut self) {
974 self.channel_invitations.clear();
975 self.channel_index.clear();
976 self.channel_participants.clear();
977 self.outgoing_invites.clear();
978 self.opened_buffers.clear();
979 self.disconnect_channel_buffers_task = None;
980 self.channel_states.clear();
981 }
982
983 pub(crate) fn update_channels(
984 &mut self,
985 payload: proto::UpdateChannels,
986 cx: &mut Context<ChannelStore>,
987 ) -> Option<Task<Result<()>>> {
988 if !payload.remove_channel_invitations.is_empty() {
989 self.channel_invitations
990 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
991 }
992 for channel in payload.channel_invitations {
993 match self
994 .channel_invitations
995 .binary_search_by_key(&channel.id, |c| c.id.0)
996 {
997 Ok(ix) => {
998 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
999 }
1000 Err(ix) => self.channel_invitations.insert(
1001 ix,
1002 Arc::new(Channel {
1003 id: ChannelId(channel.id),
1004 visibility: channel.visibility(),
1005 name: channel.name.into(),
1006 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1007 channel_order: channel.channel_order,
1008 }),
1009 ),
1010 }
1011 }
1012
1013 let channels_changed = !payload.channels.is_empty()
1014 || !payload.delete_channels.is_empty()
1015 || !payload.latest_channel_buffer_versions.is_empty();
1016
1017 if channels_changed {
1018 if !payload.delete_channels.is_empty() {
1019 let delete_channels: Vec<ChannelId> =
1020 payload.delete_channels.into_iter().map(ChannelId).collect();
1021 self.channel_index.delete_channels(&delete_channels);
1022 self.channel_participants
1023 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1024
1025 for channel_id in &delete_channels {
1026 let channel_id = *channel_id;
1027 if payload
1028 .channels
1029 .iter()
1030 .any(|channel| channel.id == channel_id.0)
1031 {
1032 continue;
1033 }
1034 if let Some(OpenEntityHandle::Open(buffer)) =
1035 self.opened_buffers.remove(&channel_id)
1036 && let Some(buffer) = buffer.upgrade()
1037 {
1038 buffer.update(cx, ChannelBuffer::disconnect);
1039 }
1040 }
1041 }
1042
1043 let mut index = self.channel_index.bulk_insert();
1044 for channel in payload.channels {
1045 let id = ChannelId(channel.id);
1046 let channel_changed = index.insert(channel);
1047
1048 if channel_changed
1049 && let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id)
1050 && let Some(buffer) = buffer.upgrade()
1051 {
1052 buffer.update(cx, ChannelBuffer::channel_changed);
1053 }
1054 }
1055
1056 for latest_buffer_version in payload.latest_channel_buffer_versions {
1057 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1058 self.channel_states
1059 .entry(ChannelId(latest_buffer_version.channel_id))
1060 .or_default()
1061 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1062 }
1063
1064 self.channels_loaded.0.try_send(true).log_err();
1065 }
1066
1067 cx.notify();
1068 if payload.channel_participants.is_empty() {
1069 return None;
1070 }
1071
1072 let mut all_user_ids = Vec::new();
1073 let channel_participants = payload.channel_participants;
1074 for entry in &channel_participants {
1075 for user_id in entry.participant_user_ids.iter() {
1076 if let Err(ix) = all_user_ids.binary_search(user_id) {
1077 all_user_ids.insert(ix, *user_id);
1078 }
1079 }
1080 }
1081
1082 let users = self
1083 .user_store
1084 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1085 Some(cx.spawn(async move |this, cx| {
1086 let users = users.await?;
1087
1088 this.update(cx, |this, cx| {
1089 for entry in &channel_participants {
1090 let mut participants: Vec<_> = entry
1091 .participant_user_ids
1092 .iter()
1093 .filter_map(|user_id| {
1094 users
1095 .binary_search_by_key(&user_id, |user| &user.id)
1096 .ok()
1097 .map(|ix| users[ix].clone())
1098 })
1099 .collect();
1100
1101 participants.sort_by_key(|u| u.id);
1102
1103 this.channel_participants
1104 .insert(ChannelId(entry.channel_id), participants);
1105 }
1106
1107 cx.notify();
1108 })
1109 }))
1110 }
1111}
1112
1113impl ChannelState {
1114 fn set_role(&mut self, role: ChannelRole) {
1115 self.role = Some(role);
1116 }
1117
1118 fn has_channel_buffer_changed(&self) -> bool {
1119 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1120 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1121 && self
1122 .latest_notes_version
1123 .version
1124 .changed_since(&self.observed_notes_version.version))
1125 }
1126
1127 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1128 if self.observed_notes_version.epoch == epoch {
1129 self.observed_notes_version.version.join(version);
1130 } else {
1131 self.observed_notes_version = NotesVersion {
1132 epoch,
1133 version: version.clone(),
1134 };
1135 }
1136 }
1137
1138 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1139 if self.latest_notes_version.epoch == epoch {
1140 self.latest_notes_version.version.join(version);
1141 } else {
1142 self.latest_notes_version = NotesVersion {
1143 epoch,
1144 version: version.clone(),
1145 };
1146 }
1147 }
1148}