1mod channel_index;
2
3use crate::{ChannelMessage, channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
4use anyhow::{Context as _, Result, anyhow};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{HashMap, HashSet, hash_map};
8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
11 WeakEntity,
12};
13use language::Capability;
14use rpc::{
15 TypedEnvelope,
16 proto::{self, ChannelRole, ChannelVisibility},
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{ResultExt, maybe};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
25 let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
26 cx.set_global(GlobalChannelStore(channel_store));
27}
28
29#[derive(Debug, Clone, Default, PartialEq)]
30struct NotesVersion {
31 epoch: u64,
32 version: clock::Global,
33}
34
35pub struct ChannelStore {
36 pub channel_index: ChannelIndex,
37 channel_invitations: Vec<Arc<Channel>>,
38 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
39 channel_states: HashMap<ChannelId, ChannelState>,
40 outgoing_invites: HashSet<(ChannelId, UserId)>,
41 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
42 opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
43 opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
44 client: Arc<Client>,
45 did_subscribe: bool,
46 user_store: Entity<UserStore>,
47 _rpc_subscriptions: [Subscription; 2],
48 _watch_connection_status: Task<Option<()>>,
49 disconnect_channel_buffers_task: Option<Task<()>>,
50 _update_channels: Task<()>,
51}
52
53#[derive(Clone, Debug)]
54pub struct Channel {
55 pub id: ChannelId,
56 pub name: SharedString,
57 pub visibility: proto::ChannelVisibility,
58 pub parent_path: Vec<ChannelId>,
59 pub channel_order: i32,
60}
61
62#[derive(Default, Debug)]
63pub struct ChannelState {
64 latest_chat_message: Option<u64>,
65 latest_notes_version: NotesVersion,
66 observed_notes_version: NotesVersion,
67 observed_chat_message: Option<u64>,
68 role: Option<ChannelRole>,
69}
70
71impl Channel {
72 pub fn link(&self, cx: &App) -> String {
73 format!(
74 "{}/channel/{}-{}",
75 ClientSettings::get_global(cx).server_url,
76 Self::slug(&self.name),
77 self.id
78 )
79 }
80
81 pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
82 self.link(cx)
83 + "/notes"
84 + &heading
85 .map(|h| format!("#{}", Self::slug(&h)))
86 .unwrap_or_default()
87 }
88
89 pub fn is_root_channel(&self) -> bool {
90 self.parent_path.is_empty()
91 }
92
93 pub fn root_id(&self) -> ChannelId {
94 self.parent_path.first().copied().unwrap_or(self.id)
95 }
96
97 pub fn slug(str: &str) -> String {
98 let slug: String = str
99 .chars()
100 .map(|c| if c.is_alphanumeric() { c } else { '-' })
101 .collect();
102
103 slug.trim_matches(|c| c == '-').to_string()
104 }
105}
106
107#[derive(Debug)]
108pub struct ChannelMembership {
109 pub user: Arc<User>,
110 pub kind: proto::channel_member::Kind,
111 pub role: proto::ChannelRole,
112}
113impl ChannelMembership {
114 pub fn sort_key(&self) -> MembershipSortKey<'_> {
115 MembershipSortKey {
116 role_order: match self.role {
117 proto::ChannelRole::Admin => 0,
118 proto::ChannelRole::Member => 1,
119 proto::ChannelRole::Banned => 2,
120 proto::ChannelRole::Talker => 3,
121 proto::ChannelRole::Guest => 4,
122 },
123 kind_order: match self.kind {
124 proto::channel_member::Kind::Member => 0,
125 proto::channel_member::Kind::Invitee => 1,
126 },
127 username_order: self.user.github_login.as_str(),
128 }
129 }
130}
131
132#[derive(PartialOrd, Ord, PartialEq, Eq)]
133pub struct MembershipSortKey<'a> {
134 role_order: u8,
135 kind_order: u8,
136 username_order: &'a str,
137}
138
139pub enum ChannelEvent {
140 ChannelCreated(ChannelId),
141 ChannelRenamed(ChannelId),
142}
143
144impl EventEmitter<ChannelEvent> for ChannelStore {}
145
146enum OpenEntityHandle<E> {
147 Open(WeakEntity<E>),
148 Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
149}
150
151struct GlobalChannelStore(Entity<ChannelStore>);
152
153impl Global for GlobalChannelStore {}
154
155impl ChannelStore {
156 pub fn global(cx: &App) -> Entity<Self> {
157 cx.global::<GlobalChannelStore>().0.clone()
158 }
159
160 pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
161 let rpc_subscriptions = [
162 client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
163 client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
164 ];
165
166 let mut connection_status = client.status();
167 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
168 let watch_connection_status = cx.spawn(async move |this, cx| {
169 while let Some(status) = connection_status.next().await {
170 let this = this.upgrade()?;
171 match status {
172 client::Status::Connected { .. } => {
173 this.update(cx, |this, cx| this.handle_connect(cx))
174 .ok()?
175 .await
176 .log_err()?;
177 }
178 client::Status::SignedOut | client::Status::UpgradeRequired => {
179 this.update(cx, |this, cx| this.handle_disconnect(false, cx))
180 .ok();
181 }
182 _ => {
183 this.update(cx, |this, cx| this.handle_disconnect(true, cx))
184 .ok();
185 }
186 }
187 }
188 Some(())
189 });
190
191 Self {
192 channel_invitations: Vec::default(),
193 channel_index: ChannelIndex::default(),
194 channel_participants: Default::default(),
195 outgoing_invites: Default::default(),
196 opened_buffers: Default::default(),
197 opened_chats: Default::default(),
198 update_channels_tx,
199 client,
200 user_store,
201 _rpc_subscriptions: rpc_subscriptions,
202 _watch_connection_status: watch_connection_status,
203 disconnect_channel_buffers_task: None,
204 _update_channels: cx.spawn(async move |this, cx| {
205 maybe!(async move {
206 while let Some(update_channels) = update_channels_rx.next().await {
207 if let Some(this) = this.upgrade() {
208 let update_task = this
209 .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
210 if let Some(update_task) = update_task {
211 update_task.await.log_err();
212 }
213 }
214 }
215 anyhow::Ok(())
216 })
217 .await
218 .log_err();
219 }),
220 channel_states: Default::default(),
221 did_subscribe: false,
222 }
223 }
224
225 pub fn initialize(&mut self) {
226 if !self.did_subscribe
227 && self
228 .client
229 .send(proto::SubscribeToChannels {})
230 .log_err()
231 .is_some()
232 {
233 self.did_subscribe = true;
234 }
235 }
236
237 pub fn client(&self) -> Arc<Client> {
238 self.client.clone()
239 }
240
241 /// Returns the number of unique channels in the store
242 pub fn channel_count(&self) -> usize {
243 self.channel_index.by_id().len()
244 }
245
246 /// Returns the index of a channel ID in the list of unique channels
247 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
248 self.channel_index
249 .by_id()
250 .keys()
251 .position(|id| *id == channel_id)
252 }
253
254 /// Returns an iterator over all unique channels
255 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
256 self.channel_index.by_id().values()
257 }
258
259 /// Iterate over all entries in the channel DAG
260 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
261 self.channel_index
262 .ordered_channels()
263 .iter()
264 .filter_map(move |id| {
265 let channel = self.channel_index.by_id().get(id)?;
266 Some((channel.parent_path.len(), channel))
267 })
268 }
269
270 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
271 let channel_id = self.channel_index.ordered_channels().get(ix)?;
272 self.channel_index.by_id().get(channel_id)
273 }
274
275 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
276 self.channel_index.by_id().values().nth(ix)
277 }
278
279 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
280 self.channel_invitations
281 .iter()
282 .any(|channel| channel.id == channel_id)
283 }
284
285 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
286 &self.channel_invitations
287 }
288
289 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
290 self.channel_index.by_id().get(&channel_id)
291 }
292
293 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
294 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
295 if let OpenEntityHandle::Open(buffer) = buffer {
296 return buffer.upgrade().is_some();
297 }
298 }
299 false
300 }
301
302 pub fn open_channel_buffer(
303 &mut self,
304 channel_id: ChannelId,
305 cx: &mut Context<Self>,
306 ) -> Task<Result<Entity<ChannelBuffer>>> {
307 let client = self.client.clone();
308 let user_store = self.user_store.clone();
309 let channel_store = cx.entity();
310 self.open_channel_resource(
311 channel_id,
312 |this| &mut this.opened_buffers,
313 async move |channel, cx| {
314 ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
315 },
316 cx,
317 )
318 }
319
320 pub fn fetch_channel_messages(
321 &self,
322 message_ids: Vec<u64>,
323 cx: &mut Context<Self>,
324 ) -> Task<Result<Vec<ChannelMessage>>> {
325 let request = if message_ids.is_empty() {
326 None
327 } else {
328 Some(
329 self.client
330 .request(proto::GetChannelMessagesById { message_ids }),
331 )
332 };
333 cx.spawn(async move |this, cx| {
334 if let Some(request) = request {
335 let response = request.await?;
336 let this = this.upgrade().context("channel store dropped")?;
337 let user_store = this.read_with(cx, |this, _| this.user_store.clone())?;
338 ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
339 } else {
340 Ok(Vec::new())
341 }
342 })
343 }
344
345 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
346 self.channel_states
347 .get(&channel_id)
348 .is_some_and(|state| state.has_channel_buffer_changed())
349 }
350
351 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
352 self.channel_states
353 .get(&channel_id)
354 .is_some_and(|state| state.has_new_messages())
355 }
356
357 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
358 if let Some(state) = self.channel_states.get_mut(&channel_id) {
359 state.latest_chat_message = message_id;
360 }
361 }
362
363 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
364 self.channel_states.get(&channel_id).and_then(|state| {
365 if let Some(last_message_id) = state.latest_chat_message {
366 if state
367 .last_acknowledged_message_id()
368 .is_some_and(|id| id < last_message_id)
369 {
370 return state.last_acknowledged_message_id();
371 }
372 }
373
374 None
375 })
376 }
377
378 pub fn acknowledge_message_id(
379 &mut self,
380 channel_id: ChannelId,
381 message_id: u64,
382 cx: &mut Context<Self>,
383 ) {
384 self.channel_states
385 .entry(channel_id)
386 .or_default()
387 .acknowledge_message_id(message_id);
388 cx.notify();
389 }
390
391 pub fn update_latest_message_id(
392 &mut self,
393 channel_id: ChannelId,
394 message_id: u64,
395 cx: &mut Context<Self>,
396 ) {
397 self.channel_states
398 .entry(channel_id)
399 .or_default()
400 .update_latest_message_id(message_id);
401 cx.notify();
402 }
403
404 pub fn acknowledge_notes_version(
405 &mut self,
406 channel_id: ChannelId,
407 epoch: u64,
408 version: &clock::Global,
409 cx: &mut Context<Self>,
410 ) {
411 self.channel_states
412 .entry(channel_id)
413 .or_default()
414 .acknowledge_notes_version(epoch, version);
415 cx.notify()
416 }
417
418 pub fn update_latest_notes_version(
419 &mut self,
420 channel_id: ChannelId,
421 epoch: u64,
422 version: &clock::Global,
423 cx: &mut Context<Self>,
424 ) {
425 self.channel_states
426 .entry(channel_id)
427 .or_default()
428 .update_latest_notes_version(epoch, version);
429 cx.notify()
430 }
431
432 pub fn open_channel_chat(
433 &mut self,
434 channel_id: ChannelId,
435 cx: &mut Context<Self>,
436 ) -> Task<Result<Entity<ChannelChat>>> {
437 let client = self.client.clone();
438 let user_store = self.user_store.clone();
439 let this = cx.entity();
440 self.open_channel_resource(
441 channel_id,
442 |this| &mut this.opened_chats,
443 async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
444 cx,
445 )
446 }
447
448 /// Asynchronously open a given resource associated with a channel.
449 ///
450 /// Make sure that the resource is only opened once, even if this method
451 /// is called multiple times with the same channel id while the first task
452 /// is still running.
453 fn open_channel_resource<T, F>(
454 &mut self,
455 channel_id: ChannelId,
456 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
457 load: F,
458 cx: &mut Context<Self>,
459 ) -> Task<Result<Entity<T>>>
460 where
461 F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
462 T: 'static,
463 {
464 let task = loop {
465 match get_map(self).entry(channel_id) {
466 hash_map::Entry::Occupied(e) => match e.get() {
467 OpenEntityHandle::Open(entity) => {
468 if let Some(entity) = entity.upgrade() {
469 break Task::ready(Ok(entity)).shared();
470 } else {
471 get_map(self).remove(&channel_id);
472 continue;
473 }
474 }
475 OpenEntityHandle::Loading(task) => {
476 break task.clone();
477 }
478 },
479 hash_map::Entry::Vacant(e) => {
480 let task = cx
481 .spawn(async move |this, cx| {
482 let channel = this.read_with(cx, |this, _| {
483 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
484 Arc::new(anyhow!("no channel for id: {channel_id}"))
485 })
486 })??;
487
488 load(channel, cx).await.map_err(Arc::new)
489 })
490 .shared();
491
492 e.insert(OpenEntityHandle::Loading(task.clone()));
493 cx.spawn({
494 let task = task.clone();
495 async move |this, cx| {
496 let result = task.await;
497 this.update(cx, |this, _| match result {
498 Ok(model) => {
499 get_map(this).insert(
500 channel_id,
501 OpenEntityHandle::Open(model.downgrade()),
502 );
503 }
504 Err(_) => {
505 get_map(this).remove(&channel_id);
506 }
507 })
508 .ok();
509 }
510 })
511 .detach();
512 break task;
513 }
514 }
515 };
516 cx.background_spawn(async move { task.await.map_err(|error| anyhow!("{error}")) })
517 }
518
519 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
520 self.channel_role(channel_id) == proto::ChannelRole::Admin
521 }
522
523 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
524 self.channel_index
525 .by_id()
526 .get(&channel_id)
527 .map_or(false, |channel| channel.is_root_channel())
528 }
529
530 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
531 self.channel_index
532 .by_id()
533 .get(&channel_id)
534 .map_or(false, |channel| {
535 channel.visibility == ChannelVisibility::Public
536 })
537 }
538
539 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
540 match self.channel_role(channel_id) {
541 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
542 _ => Capability::ReadOnly,
543 }
544 }
545
546 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
547 maybe!({
548 let mut channel = self.channel_for_id(channel_id)?;
549 if !channel.is_root_channel() {
550 channel = self.channel_for_id(channel.root_id())?;
551 }
552 let root_channel_state = self.channel_states.get(&channel.id);
553 root_channel_state?.role
554 })
555 .unwrap_or(proto::ChannelRole::Guest)
556 }
557
558 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
559 self.channel_participants
560 .get(&channel_id)
561 .map_or(&[], |v| v.as_slice())
562 }
563
564 pub fn create_channel(
565 &self,
566 name: &str,
567 parent_id: Option<ChannelId>,
568 cx: &mut Context<Self>,
569 ) -> Task<Result<ChannelId>> {
570 let client = self.client.clone();
571 let name = name.trim_start_matches('#').to_owned();
572 cx.spawn(async move |this, cx| {
573 let response = client
574 .request(proto::CreateChannel {
575 name,
576 parent_id: parent_id.map(|cid| cid.0),
577 })
578 .await?;
579
580 let channel = response.channel.context("missing channel in response")?;
581 let channel_id = ChannelId(channel.id);
582
583 this.update(cx, |this, cx| {
584 let task = this.update_channels(
585 proto::UpdateChannels {
586 channels: vec![channel],
587 ..Default::default()
588 },
589 cx,
590 );
591 assert!(task.is_none());
592
593 // This event is emitted because the collab panel wants to clear the pending edit state
594 // before this frame is rendered. But we can't guarantee that the collab panel's future
595 // will resolve before this flush_effects finishes. Synchronously emitting this event
596 // ensures that the collab panel will observe this creation before the frame completes
597 cx.emit(ChannelEvent::ChannelCreated(channel_id));
598 })?;
599
600 Ok(channel_id)
601 })
602 }
603
604 pub fn move_channel(
605 &mut self,
606 channel_id: ChannelId,
607 to: ChannelId,
608 cx: &mut Context<Self>,
609 ) -> Task<Result<()>> {
610 let client = self.client.clone();
611 cx.spawn(async move |_, _| {
612 let _ = client
613 .request(proto::MoveChannel {
614 channel_id: channel_id.0,
615 to: to.0,
616 })
617 .await?;
618 Ok(())
619 })
620 }
621
622 pub fn reorder_channel(
623 &mut self,
624 channel_id: ChannelId,
625 direction: proto::reorder_channel::Direction,
626 cx: &mut Context<Self>,
627 ) -> Task<Result<()>> {
628 let client = self.client.clone();
629 cx.spawn(async move |_, _| {
630 client
631 .request(proto::ReorderChannel {
632 channel_id: channel_id.0,
633 direction: direction.into(),
634 })
635 .await?;
636 Ok(())
637 })
638 }
639
640 pub fn set_channel_visibility(
641 &mut self,
642 channel_id: ChannelId,
643 visibility: ChannelVisibility,
644 cx: &mut Context<Self>,
645 ) -> Task<Result<()>> {
646 let client = self.client.clone();
647 cx.spawn(async move |_, _| {
648 let _ = client
649 .request(proto::SetChannelVisibility {
650 channel_id: channel_id.0,
651 visibility: visibility.into(),
652 })
653 .await?;
654
655 Ok(())
656 })
657 }
658
659 pub fn invite_member(
660 &mut self,
661 channel_id: ChannelId,
662 user_id: UserId,
663 role: proto::ChannelRole,
664 cx: &mut Context<Self>,
665 ) -> Task<Result<()>> {
666 if !self.outgoing_invites.insert((channel_id, user_id)) {
667 return Task::ready(Err(anyhow!("invite request already in progress")));
668 }
669
670 cx.notify();
671 let client = self.client.clone();
672 cx.spawn(async move |this, cx| {
673 let result = client
674 .request(proto::InviteChannelMember {
675 channel_id: channel_id.0,
676 user_id,
677 role: role.into(),
678 })
679 .await;
680
681 this.update(cx, |this, cx| {
682 this.outgoing_invites.remove(&(channel_id, user_id));
683 cx.notify();
684 })?;
685
686 result?;
687
688 Ok(())
689 })
690 }
691
692 pub fn remove_member(
693 &mut self,
694 channel_id: ChannelId,
695 user_id: u64,
696 cx: &mut Context<Self>,
697 ) -> Task<Result<()>> {
698 if !self.outgoing_invites.insert((channel_id, user_id)) {
699 return Task::ready(Err(anyhow!("invite request already in progress")));
700 }
701
702 cx.notify();
703 let client = self.client.clone();
704 cx.spawn(async move |this, cx| {
705 let result = client
706 .request(proto::RemoveChannelMember {
707 channel_id: channel_id.0,
708 user_id,
709 })
710 .await;
711
712 this.update(cx, |this, cx| {
713 this.outgoing_invites.remove(&(channel_id, user_id));
714 cx.notify();
715 })?;
716 result?;
717 Ok(())
718 })
719 }
720
721 pub fn set_member_role(
722 &mut self,
723 channel_id: ChannelId,
724 user_id: UserId,
725 role: proto::ChannelRole,
726 cx: &mut Context<Self>,
727 ) -> Task<Result<()>> {
728 if !self.outgoing_invites.insert((channel_id, user_id)) {
729 return Task::ready(Err(anyhow!("member request already in progress")));
730 }
731
732 cx.notify();
733 let client = self.client.clone();
734 cx.spawn(async move |this, cx| {
735 let result = client
736 .request(proto::SetChannelMemberRole {
737 channel_id: channel_id.0,
738 user_id,
739 role: role.into(),
740 })
741 .await;
742
743 this.update(cx, |this, cx| {
744 this.outgoing_invites.remove(&(channel_id, user_id));
745 cx.notify();
746 })?;
747
748 result?;
749 Ok(())
750 })
751 }
752
753 pub fn rename(
754 &mut self,
755 channel_id: ChannelId,
756 new_name: &str,
757 cx: &mut Context<Self>,
758 ) -> Task<Result<()>> {
759 let client = self.client.clone();
760 let name = new_name.to_string();
761 cx.spawn(async move |this, cx| {
762 let channel = client
763 .request(proto::RenameChannel {
764 channel_id: channel_id.0,
765 name,
766 })
767 .await?
768 .channel
769 .context("missing channel in response")?;
770 this.update(cx, |this, cx| {
771 let task = this.update_channels(
772 proto::UpdateChannels {
773 channels: vec![channel],
774 ..Default::default()
775 },
776 cx,
777 );
778 assert!(task.is_none());
779
780 // This event is emitted because the collab panel wants to clear the pending edit state
781 // before this frame is rendered. But we can't guarantee that the collab panel's future
782 // will resolve before this flush_effects finishes. Synchronously emitting this event
783 // ensures that the collab panel will observe this creation before the frame complete
784 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
785 })?;
786 Ok(())
787 })
788 }
789
790 pub fn respond_to_channel_invite(
791 &mut self,
792 channel_id: ChannelId,
793 accept: bool,
794 cx: &mut Context<Self>,
795 ) -> Task<Result<()>> {
796 let client = self.client.clone();
797 cx.background_spawn(async move {
798 client
799 .request(proto::RespondToChannelInvite {
800 channel_id: channel_id.0,
801 accept,
802 })
803 .await?;
804 Ok(())
805 })
806 }
807 pub fn fuzzy_search_members(
808 &self,
809 channel_id: ChannelId,
810 query: String,
811 limit: u16,
812 cx: &mut Context<Self>,
813 ) -> Task<Result<Vec<ChannelMembership>>> {
814 let client = self.client.clone();
815 let user_store = self.user_store.downgrade();
816 cx.spawn(async move |_, cx| {
817 let response = client
818 .request(proto::GetChannelMembers {
819 channel_id: channel_id.0,
820 query,
821 limit: limit as u64,
822 })
823 .await?;
824 user_store.update(cx, |user_store, _| {
825 user_store.insert(response.users);
826 response
827 .members
828 .into_iter()
829 .filter_map(|member| {
830 Some(ChannelMembership {
831 user: user_store.get_cached_user(member.user_id)?,
832 role: member.role(),
833 kind: member.kind(),
834 })
835 })
836 .collect()
837 })
838 })
839 }
840
841 pub fn remove_channel(
842 &self,
843 channel_id: ChannelId,
844 ) -> impl Future<Output = Result<()>> + use<> {
845 let client = self.client.clone();
846 async move {
847 client
848 .request(proto::DeleteChannel {
849 channel_id: channel_id.0,
850 })
851 .await?;
852 Ok(())
853 }
854 }
855
856 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
857 false
858 }
859
860 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
861 self.outgoing_invites.contains(&(channel_id, user_id))
862 }
863
864 async fn handle_update_channels(
865 this: Entity<Self>,
866 message: TypedEnvelope<proto::UpdateChannels>,
867 mut cx: AsyncApp,
868 ) -> Result<()> {
869 this.read_with(&mut cx, |this, _| {
870 this.update_channels_tx
871 .unbounded_send(message.payload)
872 .unwrap();
873 })?;
874 Ok(())
875 }
876
877 async fn handle_update_user_channels(
878 this: Entity<Self>,
879 message: TypedEnvelope<proto::UpdateUserChannels>,
880 mut cx: AsyncApp,
881 ) -> Result<()> {
882 this.update(&mut cx, |this, cx| {
883 for buffer_version in message.payload.observed_channel_buffer_version {
884 let version = language::proto::deserialize_version(&buffer_version.version);
885 this.acknowledge_notes_version(
886 ChannelId(buffer_version.channel_id),
887 buffer_version.epoch,
888 &version,
889 cx,
890 );
891 }
892 for message_id in message.payload.observed_channel_message_id {
893 this.acknowledge_message_id(
894 ChannelId(message_id.channel_id),
895 message_id.message_id,
896 cx,
897 );
898 }
899 for membership in message.payload.channel_memberships {
900 if let Some(role) = ChannelRole::from_i32(membership.role) {
901 this.channel_states
902 .entry(ChannelId(membership.channel_id))
903 .or_default()
904 .set_role(role)
905 }
906 }
907 })
908 }
909
910 fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
911 self.channel_index.clear();
912 self.channel_invitations.clear();
913 self.channel_participants.clear();
914 self.channel_index.clear();
915 self.outgoing_invites.clear();
916 self.disconnect_channel_buffers_task.take();
917
918 for chat in self.opened_chats.values() {
919 if let OpenEntityHandle::Open(chat) = chat {
920 if let Some(chat) = chat.upgrade() {
921 chat.update(cx, |chat, cx| {
922 chat.rejoin(cx);
923 });
924 }
925 }
926 }
927
928 let mut buffer_versions = Vec::new();
929 for buffer in self.opened_buffers.values() {
930 if let OpenEntityHandle::Open(buffer) = buffer {
931 if let Some(buffer) = buffer.upgrade() {
932 let channel_buffer = buffer.read(cx);
933 let buffer = channel_buffer.buffer().read(cx);
934 buffer_versions.push(proto::ChannelBufferVersion {
935 channel_id: channel_buffer.channel_id.0,
936 epoch: channel_buffer.epoch(),
937 version: language::proto::serialize_version(&buffer.version()),
938 });
939 }
940 }
941 }
942
943 if buffer_versions.is_empty() {
944 return Task::ready(Ok(()));
945 }
946
947 let response = self.client.request(proto::RejoinChannelBuffers {
948 buffers: buffer_versions,
949 });
950
951 cx.spawn(async move |this, cx| {
952 let mut response = response.await?;
953
954 this.update(cx, |this, cx| {
955 this.opened_buffers.retain(|_, buffer| match buffer {
956 OpenEntityHandle::Open(channel_buffer) => {
957 let Some(channel_buffer) = channel_buffer.upgrade() else {
958 return false;
959 };
960
961 channel_buffer.update(cx, |channel_buffer, cx| {
962 let channel_id = channel_buffer.channel_id;
963 if let Some(remote_buffer) = response
964 .buffers
965 .iter_mut()
966 .find(|buffer| buffer.channel_id == channel_id.0)
967 {
968 let channel_id = channel_buffer.channel_id;
969 let remote_version =
970 language::proto::deserialize_version(&remote_buffer.version);
971
972 channel_buffer.replace_collaborators(
973 mem::take(&mut remote_buffer.collaborators),
974 cx,
975 );
976
977 let operations = channel_buffer
978 .buffer()
979 .update(cx, |buffer, cx| {
980 let outgoing_operations =
981 buffer.serialize_ops(Some(remote_version), cx);
982 let incoming_operations =
983 mem::take(&mut remote_buffer.operations)
984 .into_iter()
985 .map(language::proto::deserialize_operation)
986 .collect::<Result<Vec<_>>>()?;
987 buffer.apply_ops(incoming_operations, cx);
988 anyhow::Ok(outgoing_operations)
989 })
990 .log_err();
991
992 if let Some(operations) = operations {
993 channel_buffer.connected(cx);
994 let client = this.client.clone();
995 cx.background_spawn(async move {
996 let operations = operations.await;
997 for chunk in language::proto::split_operations(operations) {
998 client
999 .send(proto::UpdateChannelBuffer {
1000 channel_id: channel_id.0,
1001 operations: chunk,
1002 })
1003 .ok();
1004 }
1005 })
1006 .detach();
1007 return true;
1008 }
1009 }
1010
1011 channel_buffer.disconnect(cx);
1012 false
1013 })
1014 }
1015 OpenEntityHandle::Loading(_) => true,
1016 });
1017 })
1018 .ok();
1019 anyhow::Ok(())
1020 })
1021 }
1022
1023 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1024 cx.notify();
1025 self.did_subscribe = false;
1026 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1027 cx.spawn(async move |this, cx| {
1028 if wait_for_reconnect {
1029 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1030 }
1031
1032 if let Some(this) = this.upgrade() {
1033 this.update(cx, |this, cx| {
1034 for (_, buffer) in &this.opened_buffers {
1035 if let OpenEntityHandle::Open(buffer) = &buffer {
1036 if let Some(buffer) = buffer.upgrade() {
1037 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1038 }
1039 }
1040 }
1041 })
1042 .ok();
1043 }
1044 })
1045 });
1046 }
1047
1048 #[cfg(any(test, feature = "test-support"))]
1049 pub fn reset(&mut self) {
1050 self.channel_invitations.clear();
1051 self.channel_index.clear();
1052 self.channel_participants.clear();
1053 self.outgoing_invites.clear();
1054 self.opened_buffers.clear();
1055 self.opened_chats.clear();
1056 self.disconnect_channel_buffers_task = None;
1057 self.channel_states.clear();
1058 }
1059
1060 pub(crate) fn update_channels(
1061 &mut self,
1062 payload: proto::UpdateChannels,
1063 cx: &mut Context<ChannelStore>,
1064 ) -> Option<Task<Result<()>>> {
1065 if !payload.remove_channel_invitations.is_empty() {
1066 self.channel_invitations
1067 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1068 }
1069 for channel in payload.channel_invitations {
1070 match self
1071 .channel_invitations
1072 .binary_search_by_key(&channel.id, |c| c.id.0)
1073 {
1074 Ok(ix) => {
1075 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1076 }
1077 Err(ix) => self.channel_invitations.insert(
1078 ix,
1079 Arc::new(Channel {
1080 id: ChannelId(channel.id),
1081 visibility: channel.visibility(),
1082 name: channel.name.into(),
1083 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1084 channel_order: channel.channel_order,
1085 }),
1086 ),
1087 }
1088 }
1089
1090 let channels_changed = !payload.channels.is_empty()
1091 || !payload.delete_channels.is_empty()
1092 || !payload.latest_channel_message_ids.is_empty()
1093 || !payload.latest_channel_buffer_versions.is_empty();
1094
1095 if channels_changed {
1096 if !payload.delete_channels.is_empty() {
1097 let delete_channels: Vec<ChannelId> =
1098 payload.delete_channels.into_iter().map(ChannelId).collect();
1099 self.channel_index.delete_channels(&delete_channels);
1100 self.channel_participants
1101 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1102
1103 for channel_id in &delete_channels {
1104 let channel_id = *channel_id;
1105 if payload
1106 .channels
1107 .iter()
1108 .any(|channel| channel.id == channel_id.0)
1109 {
1110 continue;
1111 }
1112 if let Some(OpenEntityHandle::Open(buffer)) =
1113 self.opened_buffers.remove(&channel_id)
1114 {
1115 if let Some(buffer) = buffer.upgrade() {
1116 buffer.update(cx, ChannelBuffer::disconnect);
1117 }
1118 }
1119 }
1120 }
1121
1122 let mut index = self.channel_index.bulk_insert();
1123 for channel in payload.channels {
1124 let id = ChannelId(channel.id);
1125 let channel_changed = index.insert(channel);
1126
1127 if channel_changed {
1128 if let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1129 if let Some(buffer) = buffer.upgrade() {
1130 buffer.update(cx, ChannelBuffer::channel_changed);
1131 }
1132 }
1133 }
1134 }
1135
1136 for latest_buffer_version in payload.latest_channel_buffer_versions {
1137 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1138 self.channel_states
1139 .entry(ChannelId(latest_buffer_version.channel_id))
1140 .or_default()
1141 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1142 }
1143
1144 for latest_channel_message in payload.latest_channel_message_ids {
1145 self.channel_states
1146 .entry(ChannelId(latest_channel_message.channel_id))
1147 .or_default()
1148 .update_latest_message_id(latest_channel_message.message_id);
1149 }
1150 }
1151
1152 cx.notify();
1153 if payload.channel_participants.is_empty() {
1154 return None;
1155 }
1156
1157 let mut all_user_ids = Vec::new();
1158 let channel_participants = payload.channel_participants;
1159 for entry in &channel_participants {
1160 for user_id in entry.participant_user_ids.iter() {
1161 if let Err(ix) = all_user_ids.binary_search(user_id) {
1162 all_user_ids.insert(ix, *user_id);
1163 }
1164 }
1165 }
1166
1167 let users = self
1168 .user_store
1169 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1170 Some(cx.spawn(async move |this, cx| {
1171 let users = users.await?;
1172
1173 this.update(cx, |this, cx| {
1174 for entry in &channel_participants {
1175 let mut participants: Vec<_> = entry
1176 .participant_user_ids
1177 .iter()
1178 .filter_map(|user_id| {
1179 users
1180 .binary_search_by_key(&user_id, |user| &user.id)
1181 .ok()
1182 .map(|ix| users[ix].clone())
1183 })
1184 .collect();
1185
1186 participants.sort_by_key(|u| u.id);
1187
1188 this.channel_participants
1189 .insert(ChannelId(entry.channel_id), participants);
1190 }
1191
1192 cx.notify();
1193 })
1194 }))
1195 }
1196}
1197
1198impl ChannelState {
1199 fn set_role(&mut self, role: ChannelRole) {
1200 self.role = Some(role);
1201 }
1202
1203 fn has_channel_buffer_changed(&self) -> bool {
1204 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1205 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1206 && self
1207 .latest_notes_version
1208 .version
1209 .changed_since(&self.observed_notes_version.version))
1210 }
1211
1212 fn has_new_messages(&self) -> bool {
1213 let latest_message_id = self.latest_chat_message;
1214 let observed_message_id = self.observed_chat_message;
1215
1216 latest_message_id.is_some_and(|latest_message_id| {
1217 latest_message_id > observed_message_id.unwrap_or_default()
1218 })
1219 }
1220
1221 fn last_acknowledged_message_id(&self) -> Option<u64> {
1222 self.observed_chat_message
1223 }
1224
1225 fn acknowledge_message_id(&mut self, message_id: u64) {
1226 let observed = self.observed_chat_message.get_or_insert(message_id);
1227 *observed = (*observed).max(message_id);
1228 }
1229
1230 fn update_latest_message_id(&mut self, message_id: u64) {
1231 self.latest_chat_message =
1232 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1233 }
1234
1235 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1236 if self.observed_notes_version.epoch == epoch {
1237 self.observed_notes_version.version.join(version);
1238 } else {
1239 self.observed_notes_version = NotesVersion {
1240 epoch,
1241 version: version.clone(),
1242 };
1243 }
1244 }
1245
1246 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1247 if self.latest_notes_version.epoch == epoch {
1248 self.latest_notes_version.version.join(version);
1249 } else {
1250 self.latest_notes_version = NotesVersion {
1251 epoch,
1252 version: version.clone(),
1253 };
1254 }
1255 }
1256}