1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
11 WeakEntity,
12};
13use language::Capability;
14use rpc::{
15 proto::{self, ChannelRole, ChannelVisibility},
16 TypedEnvelope,
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{maybe, ResultExt};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
25 let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
26 cx.set_global(GlobalChannelStore(channel_store));
27}
28
29#[derive(Debug, Clone, Default, PartialEq)]
30struct NotesVersion {
31 epoch: u64,
32 version: clock::Global,
33}
34
35pub struct ChannelStore {
36 pub channel_index: ChannelIndex,
37 channel_invitations: Vec<Arc<Channel>>,
38 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
39 channel_states: HashMap<ChannelId, ChannelState>,
40 outgoing_invites: HashSet<(ChannelId, UserId)>,
41 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
42 opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
43 opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
44 client: Arc<Client>,
45 did_subscribe: bool,
46 user_store: Entity<UserStore>,
47 _rpc_subscriptions: [Subscription; 2],
48 _watch_connection_status: Task<Option<()>>,
49 disconnect_channel_buffers_task: Option<Task<()>>,
50 _update_channels: Task<()>,
51}
52
53#[derive(Clone, Debug)]
54pub struct Channel {
55 pub id: ChannelId,
56 pub name: SharedString,
57 pub visibility: proto::ChannelVisibility,
58 pub parent_path: Vec<ChannelId>,
59}
60
61#[derive(Default, Debug)]
62pub struct ChannelState {
63 latest_chat_message: Option<u64>,
64 latest_notes_version: NotesVersion,
65 observed_notes_version: NotesVersion,
66 observed_chat_message: Option<u64>,
67 role: Option<ChannelRole>,
68}
69
70impl Channel {
71 pub fn link(&self, cx: &App) -> String {
72 format!(
73 "{}/channel/{}-{}",
74 ClientSettings::get_global(cx).server_url,
75 Self::slug(&self.name),
76 self.id
77 )
78 }
79
80 pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
81 self.link(cx)
82 + "/notes"
83 + &heading
84 .map(|h| format!("#{}", Self::slug(&h)))
85 .unwrap_or_default()
86 }
87
88 pub fn is_root_channel(&self) -> bool {
89 self.parent_path.is_empty()
90 }
91
92 pub fn root_id(&self) -> ChannelId {
93 self.parent_path.first().copied().unwrap_or(self.id)
94 }
95
96 pub fn slug(str: &str) -> String {
97 let slug: String = str
98 .chars()
99 .map(|c| if c.is_alphanumeric() { c } else { '-' })
100 .collect();
101
102 slug.trim_matches(|c| c == '-').to_string()
103 }
104}
105
106#[derive(Debug)]
107pub struct ChannelMembership {
108 pub user: Arc<User>,
109 pub kind: proto::channel_member::Kind,
110 pub role: proto::ChannelRole,
111}
112impl ChannelMembership {
113 pub fn sort_key(&self) -> MembershipSortKey {
114 MembershipSortKey {
115 role_order: match self.role {
116 proto::ChannelRole::Admin => 0,
117 proto::ChannelRole::Member => 1,
118 proto::ChannelRole::Banned => 2,
119 proto::ChannelRole::Talker => 3,
120 proto::ChannelRole::Guest => 4,
121 },
122 kind_order: match self.kind {
123 proto::channel_member::Kind::Member => 0,
124 proto::channel_member::Kind::Invitee => 1,
125 },
126 username_order: self.user.github_login.as_str(),
127 }
128 }
129}
130
131#[derive(PartialOrd, Ord, PartialEq, Eq)]
132pub struct MembershipSortKey<'a> {
133 role_order: u8,
134 kind_order: u8,
135 username_order: &'a str,
136}
137
138pub enum ChannelEvent {
139 ChannelCreated(ChannelId),
140 ChannelRenamed(ChannelId),
141}
142
143impl EventEmitter<ChannelEvent> for ChannelStore {}
144
145enum OpenEntityHandle<E> {
146 Open(WeakEntity<E>),
147 Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
148}
149
150struct GlobalChannelStore(Entity<ChannelStore>);
151
152impl Global for GlobalChannelStore {}
153
154impl ChannelStore {
155 pub fn global(cx: &App) -> Entity<Self> {
156 cx.global::<GlobalChannelStore>().0.clone()
157 }
158
159 pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
160 let rpc_subscriptions = [
161 client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
162 client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
163 ];
164
165 let mut connection_status = client.status();
166 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
167 let watch_connection_status = cx.spawn(|this, mut cx| async move {
168 while let Some(status) = connection_status.next().await {
169 let this = this.upgrade()?;
170 match status {
171 client::Status::Connected { .. } => {
172 this.update(&mut cx, |this, cx| this.handle_connect(cx))
173 .ok()?
174 .await
175 .log_err()?;
176 }
177 client::Status::SignedOut | client::Status::UpgradeRequired => {
178 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
179 .ok();
180 }
181 _ => {
182 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
183 .ok();
184 }
185 }
186 }
187 Some(())
188 });
189
190 Self {
191 channel_invitations: Vec::default(),
192 channel_index: ChannelIndex::default(),
193 channel_participants: Default::default(),
194 outgoing_invites: Default::default(),
195 opened_buffers: Default::default(),
196 opened_chats: Default::default(),
197 update_channels_tx,
198 client,
199 user_store,
200 _rpc_subscriptions: rpc_subscriptions,
201 _watch_connection_status: watch_connection_status,
202 disconnect_channel_buffers_task: None,
203 _update_channels: cx.spawn(|this, mut cx| async move {
204 maybe!(async move {
205 while let Some(update_channels) = update_channels_rx.next().await {
206 if let Some(this) = this.upgrade() {
207 let update_task = this.update(&mut cx, |this, cx| {
208 this.update_channels(update_channels, cx)
209 })?;
210 if let Some(update_task) = update_task {
211 update_task.await.log_err();
212 }
213 }
214 }
215 anyhow::Ok(())
216 })
217 .await
218 .log_err();
219 }),
220 channel_states: Default::default(),
221 did_subscribe: false,
222 }
223 }
224
225 pub fn initialize(&mut self) {
226 if !self.did_subscribe
227 && self
228 .client
229 .send(proto::SubscribeToChannels {})
230 .log_err()
231 .is_some()
232 {
233 self.did_subscribe = true;
234 }
235 }
236
237 pub fn client(&self) -> Arc<Client> {
238 self.client.clone()
239 }
240
241 /// Returns the number of unique channels in the store
242 pub fn channel_count(&self) -> usize {
243 self.channel_index.by_id().len()
244 }
245
246 /// Returns the index of a channel ID in the list of unique channels
247 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
248 self.channel_index
249 .by_id()
250 .keys()
251 .position(|id| *id == channel_id)
252 }
253
254 /// Returns an iterator over all unique channels
255 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
256 self.channel_index.by_id().values()
257 }
258
259 /// Iterate over all entries in the channel DAG
260 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
261 self.channel_index
262 .ordered_channels()
263 .iter()
264 .filter_map(move |id| {
265 let channel = self.channel_index.by_id().get(id)?;
266 Some((channel.parent_path.len(), channel))
267 })
268 }
269
270 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
271 let channel_id = self.channel_index.ordered_channels().get(ix)?;
272 self.channel_index.by_id().get(channel_id)
273 }
274
275 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
276 self.channel_index.by_id().values().nth(ix)
277 }
278
279 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
280 self.channel_invitations
281 .iter()
282 .any(|channel| channel.id == channel_id)
283 }
284
285 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
286 &self.channel_invitations
287 }
288
289 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
290 self.channel_index.by_id().get(&channel_id)
291 }
292
293 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
294 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
295 if let OpenEntityHandle::Open(buffer) = buffer {
296 return buffer.upgrade().is_some();
297 }
298 }
299 false
300 }
301
302 pub fn open_channel_buffer(
303 &mut self,
304 channel_id: ChannelId,
305 cx: &mut Context<Self>,
306 ) -> Task<Result<Entity<ChannelBuffer>>> {
307 let client = self.client.clone();
308 let user_store = self.user_store.clone();
309 let channel_store = cx.entity();
310 self.open_channel_resource(
311 channel_id,
312 |this| &mut this.opened_buffers,
313 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
314 cx,
315 )
316 }
317
318 pub fn fetch_channel_messages(
319 &self,
320 message_ids: Vec<u64>,
321 cx: &mut Context<Self>,
322 ) -> Task<Result<Vec<ChannelMessage>>> {
323 let request = if message_ids.is_empty() {
324 None
325 } else {
326 Some(
327 self.client
328 .request(proto::GetChannelMessagesById { message_ids }),
329 )
330 };
331 cx.spawn(|this, mut cx| async move {
332 if let Some(request) = request {
333 let response = request.await?;
334 let this = this
335 .upgrade()
336 .ok_or_else(|| anyhow!("channel store dropped"))?;
337 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
338 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
339 } else {
340 Ok(Vec::new())
341 }
342 })
343 }
344
345 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
346 self.channel_states
347 .get(&channel_id)
348 .is_some_and(|state| state.has_channel_buffer_changed())
349 }
350
351 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
352 self.channel_states
353 .get(&channel_id)
354 .is_some_and(|state| state.has_new_messages())
355 }
356
357 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
358 if let Some(state) = self.channel_states.get_mut(&channel_id) {
359 state.latest_chat_message = message_id;
360 }
361 }
362
363 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
364 self.channel_states.get(&channel_id).and_then(|state| {
365 if let Some(last_message_id) = state.latest_chat_message {
366 if state
367 .last_acknowledged_message_id()
368 .is_some_and(|id| id < last_message_id)
369 {
370 return state.last_acknowledged_message_id();
371 }
372 }
373
374 None
375 })
376 }
377
378 pub fn acknowledge_message_id(
379 &mut self,
380 channel_id: ChannelId,
381 message_id: u64,
382 cx: &mut Context<Self>,
383 ) {
384 self.channel_states
385 .entry(channel_id)
386 .or_default()
387 .acknowledge_message_id(message_id);
388 cx.notify();
389 }
390
391 pub fn update_latest_message_id(
392 &mut self,
393 channel_id: ChannelId,
394 message_id: u64,
395 cx: &mut Context<Self>,
396 ) {
397 self.channel_states
398 .entry(channel_id)
399 .or_default()
400 .update_latest_message_id(message_id);
401 cx.notify();
402 }
403
404 pub fn acknowledge_notes_version(
405 &mut self,
406 channel_id: ChannelId,
407 epoch: u64,
408 version: &clock::Global,
409 cx: &mut Context<Self>,
410 ) {
411 self.channel_states
412 .entry(channel_id)
413 .or_default()
414 .acknowledge_notes_version(epoch, version);
415 cx.notify()
416 }
417
418 pub fn update_latest_notes_version(
419 &mut self,
420 channel_id: ChannelId,
421 epoch: u64,
422 version: &clock::Global,
423 cx: &mut Context<Self>,
424 ) {
425 self.channel_states
426 .entry(channel_id)
427 .or_default()
428 .update_latest_notes_version(epoch, version);
429 cx.notify()
430 }
431
432 pub fn open_channel_chat(
433 &mut self,
434 channel_id: ChannelId,
435 cx: &mut Context<Self>,
436 ) -> Task<Result<Entity<ChannelChat>>> {
437 let client = self.client.clone();
438 let user_store = self.user_store.clone();
439 let this = cx.entity();
440 self.open_channel_resource(
441 channel_id,
442 |this| &mut this.opened_chats,
443 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
444 cx,
445 )
446 }
447
448 /// Asynchronously open a given resource associated with a channel.
449 ///
450 /// Make sure that the resource is only opened once, even if this method
451 /// is called multiple times with the same channel id while the first task
452 /// is still running.
453 fn open_channel_resource<T, F, Fut>(
454 &mut self,
455 channel_id: ChannelId,
456 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
457 load: F,
458 cx: &mut Context<Self>,
459 ) -> Task<Result<Entity<T>>>
460 where
461 F: 'static + FnOnce(Arc<Channel>, AsyncApp) -> Fut,
462 Fut: Future<Output = Result<Entity<T>>>,
463 T: 'static,
464 {
465 let task = loop {
466 match get_map(self).entry(channel_id) {
467 hash_map::Entry::Occupied(e) => match e.get() {
468 OpenEntityHandle::Open(entity) => {
469 if let Some(entity) = entity.upgrade() {
470 break Task::ready(Ok(entity)).shared();
471 } else {
472 get_map(self).remove(&channel_id);
473 continue;
474 }
475 }
476 OpenEntityHandle::Loading(task) => {
477 break task.clone();
478 }
479 },
480 hash_map::Entry::Vacant(e) => {
481 let task = cx
482 .spawn(move |this, mut cx| async move {
483 let channel = this.update(&mut cx, |this, _| {
484 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
485 Arc::new(anyhow!("no channel for id: {}", channel_id))
486 })
487 })??;
488
489 load(channel, cx).await.map_err(Arc::new)
490 })
491 .shared();
492
493 e.insert(OpenEntityHandle::Loading(task.clone()));
494 cx.spawn({
495 let task = task.clone();
496 move |this, mut cx| async move {
497 let result = task.await;
498 this.update(&mut cx, |this, _| match result {
499 Ok(model) => {
500 get_map(this).insert(
501 channel_id,
502 OpenEntityHandle::Open(model.downgrade()),
503 );
504 }
505 Err(_) => {
506 get_map(this).remove(&channel_id);
507 }
508 })
509 .ok();
510 }
511 })
512 .detach();
513 break task;
514 }
515 }
516 };
517 cx.background_executor()
518 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
519 }
520
521 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
522 self.channel_role(channel_id) == proto::ChannelRole::Admin
523 }
524
525 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
526 self.channel_index
527 .by_id()
528 .get(&channel_id)
529 .map_or(false, |channel| channel.is_root_channel())
530 }
531
532 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
533 self.channel_index
534 .by_id()
535 .get(&channel_id)
536 .map_or(false, |channel| {
537 channel.visibility == ChannelVisibility::Public
538 })
539 }
540
541 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
542 match self.channel_role(channel_id) {
543 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
544 _ => Capability::ReadOnly,
545 }
546 }
547
548 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
549 maybe!({
550 let mut channel = self.channel_for_id(channel_id)?;
551 if !channel.is_root_channel() {
552 channel = self.channel_for_id(channel.root_id())?;
553 }
554 let root_channel_state = self.channel_states.get(&channel.id);
555 root_channel_state?.role
556 })
557 .unwrap_or(proto::ChannelRole::Guest)
558 }
559
560 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
561 self.channel_participants
562 .get(&channel_id)
563 .map_or(&[], |v| v.as_slice())
564 }
565
566 pub fn create_channel(
567 &self,
568 name: &str,
569 parent_id: Option<ChannelId>,
570 cx: &mut Context<Self>,
571 ) -> Task<Result<ChannelId>> {
572 let client = self.client.clone();
573 let name = name.trim_start_matches('#').to_owned();
574 cx.spawn(move |this, mut cx| async move {
575 let response = client
576 .request(proto::CreateChannel {
577 name,
578 parent_id: parent_id.map(|cid| cid.0),
579 })
580 .await?;
581
582 let channel = response
583 .channel
584 .ok_or_else(|| anyhow!("missing channel in response"))?;
585 let channel_id = ChannelId(channel.id);
586
587 this.update(&mut cx, |this, cx| {
588 let task = this.update_channels(
589 proto::UpdateChannels {
590 channels: vec![channel],
591 ..Default::default()
592 },
593 cx,
594 );
595 assert!(task.is_none());
596
597 // This event is emitted because the collab panel wants to clear the pending edit state
598 // before this frame is rendered. But we can't guarantee that the collab panel's future
599 // will resolve before this flush_effects finishes. Synchronously emitting this event
600 // ensures that the collab panel will observe this creation before the frame completes
601 cx.emit(ChannelEvent::ChannelCreated(channel_id));
602 })?;
603
604 Ok(channel_id)
605 })
606 }
607
608 pub fn move_channel(
609 &mut self,
610 channel_id: ChannelId,
611 to: ChannelId,
612 cx: &mut Context<Self>,
613 ) -> Task<Result<()>> {
614 let client = self.client.clone();
615 cx.spawn(move |_, _| async move {
616 let _ = client
617 .request(proto::MoveChannel {
618 channel_id: channel_id.0,
619 to: to.0,
620 })
621 .await?;
622
623 Ok(())
624 })
625 }
626
627 pub fn set_channel_visibility(
628 &mut self,
629 channel_id: ChannelId,
630 visibility: ChannelVisibility,
631 cx: &mut Context<Self>,
632 ) -> Task<Result<()>> {
633 let client = self.client.clone();
634 cx.spawn(move |_, _| async move {
635 let _ = client
636 .request(proto::SetChannelVisibility {
637 channel_id: channel_id.0,
638 visibility: visibility.into(),
639 })
640 .await?;
641
642 Ok(())
643 })
644 }
645
646 pub fn invite_member(
647 &mut self,
648 channel_id: ChannelId,
649 user_id: UserId,
650 role: proto::ChannelRole,
651 cx: &mut Context<Self>,
652 ) -> Task<Result<()>> {
653 if !self.outgoing_invites.insert((channel_id, user_id)) {
654 return Task::ready(Err(anyhow!("invite request already in progress")));
655 }
656
657 cx.notify();
658 let client = self.client.clone();
659 cx.spawn(move |this, mut cx| async move {
660 let result = client
661 .request(proto::InviteChannelMember {
662 channel_id: channel_id.0,
663 user_id,
664 role: role.into(),
665 })
666 .await;
667
668 this.update(&mut cx, |this, cx| {
669 this.outgoing_invites.remove(&(channel_id, user_id));
670 cx.notify();
671 })?;
672
673 result?;
674
675 Ok(())
676 })
677 }
678
679 pub fn remove_member(
680 &mut self,
681 channel_id: ChannelId,
682 user_id: u64,
683 cx: &mut Context<Self>,
684 ) -> Task<Result<()>> {
685 if !self.outgoing_invites.insert((channel_id, user_id)) {
686 return Task::ready(Err(anyhow!("invite request already in progress")));
687 }
688
689 cx.notify();
690 let client = self.client.clone();
691 cx.spawn(move |this, mut cx| async move {
692 let result = client
693 .request(proto::RemoveChannelMember {
694 channel_id: channel_id.0,
695 user_id,
696 })
697 .await;
698
699 this.update(&mut cx, |this, cx| {
700 this.outgoing_invites.remove(&(channel_id, user_id));
701 cx.notify();
702 })?;
703 result?;
704 Ok(())
705 })
706 }
707
708 pub fn set_member_role(
709 &mut self,
710 channel_id: ChannelId,
711 user_id: UserId,
712 role: proto::ChannelRole,
713 cx: &mut Context<Self>,
714 ) -> Task<Result<()>> {
715 if !self.outgoing_invites.insert((channel_id, user_id)) {
716 return Task::ready(Err(anyhow!("member request already in progress")));
717 }
718
719 cx.notify();
720 let client = self.client.clone();
721 cx.spawn(move |this, mut cx| async move {
722 let result = client
723 .request(proto::SetChannelMemberRole {
724 channel_id: channel_id.0,
725 user_id,
726 role: role.into(),
727 })
728 .await;
729
730 this.update(&mut cx, |this, cx| {
731 this.outgoing_invites.remove(&(channel_id, user_id));
732 cx.notify();
733 })?;
734
735 result?;
736 Ok(())
737 })
738 }
739
740 pub fn rename(
741 &mut self,
742 channel_id: ChannelId,
743 new_name: &str,
744 cx: &mut Context<Self>,
745 ) -> Task<Result<()>> {
746 let client = self.client.clone();
747 let name = new_name.to_string();
748 cx.spawn(move |this, mut cx| async move {
749 let channel = client
750 .request(proto::RenameChannel {
751 channel_id: channel_id.0,
752 name,
753 })
754 .await?
755 .channel
756 .ok_or_else(|| anyhow!("missing channel in response"))?;
757 this.update(&mut cx, |this, cx| {
758 let task = this.update_channels(
759 proto::UpdateChannels {
760 channels: vec![channel],
761 ..Default::default()
762 },
763 cx,
764 );
765 assert!(task.is_none());
766
767 // This event is emitted because the collab panel wants to clear the pending edit state
768 // before this frame is rendered. But we can't guarantee that the collab panel's future
769 // will resolve before this flush_effects finishes. Synchronously emitting this event
770 // ensures that the collab panel will observe this creation before the frame complete
771 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
772 })?;
773 Ok(())
774 })
775 }
776
777 pub fn respond_to_channel_invite(
778 &mut self,
779 channel_id: ChannelId,
780 accept: bool,
781 cx: &mut Context<Self>,
782 ) -> Task<Result<()>> {
783 let client = self.client.clone();
784 cx.background_executor().spawn(async move {
785 client
786 .request(proto::RespondToChannelInvite {
787 channel_id: channel_id.0,
788 accept,
789 })
790 .await?;
791 Ok(())
792 })
793 }
794 pub fn fuzzy_search_members(
795 &self,
796 channel_id: ChannelId,
797 query: String,
798 limit: u16,
799 cx: &mut Context<Self>,
800 ) -> Task<Result<Vec<ChannelMembership>>> {
801 let client = self.client.clone();
802 let user_store = self.user_store.downgrade();
803 cx.spawn(move |_, mut cx| async move {
804 let response = client
805 .request(proto::GetChannelMembers {
806 channel_id: channel_id.0,
807 query,
808 limit: limit as u64,
809 })
810 .await?;
811 user_store.update(&mut cx, |user_store, _| {
812 user_store.insert(response.users);
813 response
814 .members
815 .into_iter()
816 .filter_map(|member| {
817 Some(ChannelMembership {
818 user: user_store.get_cached_user(member.user_id)?,
819 role: member.role(),
820 kind: member.kind(),
821 })
822 })
823 .collect()
824 })
825 })
826 }
827
828 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
829 let client = self.client.clone();
830 async move {
831 client
832 .request(proto::DeleteChannel {
833 channel_id: channel_id.0,
834 })
835 .await?;
836 Ok(())
837 }
838 }
839
840 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
841 false
842 }
843
844 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
845 self.outgoing_invites.contains(&(channel_id, user_id))
846 }
847
848 async fn handle_update_channels(
849 this: Entity<Self>,
850 message: TypedEnvelope<proto::UpdateChannels>,
851 mut cx: AsyncApp,
852 ) -> Result<()> {
853 this.update(&mut cx, |this, _| {
854 this.update_channels_tx
855 .unbounded_send(message.payload)
856 .unwrap();
857 })?;
858 Ok(())
859 }
860
861 async fn handle_update_user_channels(
862 this: Entity<Self>,
863 message: TypedEnvelope<proto::UpdateUserChannels>,
864 mut cx: AsyncApp,
865 ) -> Result<()> {
866 this.update(&mut cx, |this, cx| {
867 for buffer_version in message.payload.observed_channel_buffer_version {
868 let version = language::proto::deserialize_version(&buffer_version.version);
869 this.acknowledge_notes_version(
870 ChannelId(buffer_version.channel_id),
871 buffer_version.epoch,
872 &version,
873 cx,
874 );
875 }
876 for message_id in message.payload.observed_channel_message_id {
877 this.acknowledge_message_id(
878 ChannelId(message_id.channel_id),
879 message_id.message_id,
880 cx,
881 );
882 }
883 for membership in message.payload.channel_memberships {
884 if let Some(role) = ChannelRole::from_i32(membership.role) {
885 this.channel_states
886 .entry(ChannelId(membership.channel_id))
887 .or_default()
888 .set_role(role)
889 }
890 }
891 })
892 }
893
894 fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
895 self.channel_index.clear();
896 self.channel_invitations.clear();
897 self.channel_participants.clear();
898 self.channel_index.clear();
899 self.outgoing_invites.clear();
900 self.disconnect_channel_buffers_task.take();
901
902 for chat in self.opened_chats.values() {
903 if let OpenEntityHandle::Open(chat) = chat {
904 if let Some(chat) = chat.upgrade() {
905 chat.update(cx, |chat, cx| {
906 chat.rejoin(cx);
907 });
908 }
909 }
910 }
911
912 let mut buffer_versions = Vec::new();
913 for buffer in self.opened_buffers.values() {
914 if let OpenEntityHandle::Open(buffer) = buffer {
915 if let Some(buffer) = buffer.upgrade() {
916 let channel_buffer = buffer.read(cx);
917 let buffer = channel_buffer.buffer().read(cx);
918 buffer_versions.push(proto::ChannelBufferVersion {
919 channel_id: channel_buffer.channel_id.0,
920 epoch: channel_buffer.epoch(),
921 version: language::proto::serialize_version(&buffer.version()),
922 });
923 }
924 }
925 }
926
927 if buffer_versions.is_empty() {
928 return Task::ready(Ok(()));
929 }
930
931 let response = self.client.request(proto::RejoinChannelBuffers {
932 buffers: buffer_versions,
933 });
934
935 cx.spawn(|this, mut cx| async move {
936 let mut response = response.await?;
937
938 this.update(&mut cx, |this, cx| {
939 this.opened_buffers.retain(|_, buffer| match buffer {
940 OpenEntityHandle::Open(channel_buffer) => {
941 let Some(channel_buffer) = channel_buffer.upgrade() else {
942 return false;
943 };
944
945 channel_buffer.update(cx, |channel_buffer, cx| {
946 let channel_id = channel_buffer.channel_id;
947 if let Some(remote_buffer) = response
948 .buffers
949 .iter_mut()
950 .find(|buffer| buffer.channel_id == channel_id.0)
951 {
952 let channel_id = channel_buffer.channel_id;
953 let remote_version =
954 language::proto::deserialize_version(&remote_buffer.version);
955
956 channel_buffer.replace_collaborators(
957 mem::take(&mut remote_buffer.collaborators),
958 cx,
959 );
960
961 let operations = channel_buffer
962 .buffer()
963 .update(cx, |buffer, cx| {
964 let outgoing_operations =
965 buffer.serialize_ops(Some(remote_version), cx);
966 let incoming_operations =
967 mem::take(&mut remote_buffer.operations)
968 .into_iter()
969 .map(language::proto::deserialize_operation)
970 .collect::<Result<Vec<_>>>()?;
971 buffer.apply_ops(incoming_operations, cx);
972 anyhow::Ok(outgoing_operations)
973 })
974 .log_err();
975
976 if let Some(operations) = operations {
977 let client = this.client.clone();
978 cx.background_executor()
979 .spawn(async move {
980 let operations = operations.await;
981 for chunk in
982 language::proto::split_operations(operations)
983 {
984 client
985 .send(proto::UpdateChannelBuffer {
986 channel_id: channel_id.0,
987 operations: chunk,
988 })
989 .ok();
990 }
991 })
992 .detach();
993 return true;
994 }
995 }
996
997 channel_buffer.disconnect(cx);
998 false
999 })
1000 }
1001 OpenEntityHandle::Loading(_) => true,
1002 });
1003 })
1004 .ok();
1005 anyhow::Ok(())
1006 })
1007 }
1008
1009 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1010 cx.notify();
1011 self.did_subscribe = false;
1012 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1013 cx.spawn(move |this, mut cx| async move {
1014 if wait_for_reconnect {
1015 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1016 }
1017
1018 if let Some(this) = this.upgrade() {
1019 this.update(&mut cx, |this, cx| {
1020 for (_, buffer) in this.opened_buffers.drain() {
1021 if let OpenEntityHandle::Open(buffer) = buffer {
1022 if let Some(buffer) = buffer.upgrade() {
1023 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1024 }
1025 }
1026 }
1027 })
1028 .ok();
1029 }
1030 })
1031 });
1032 }
1033
1034 pub(crate) fn update_channels(
1035 &mut self,
1036 payload: proto::UpdateChannels,
1037 cx: &mut Context<ChannelStore>,
1038 ) -> Option<Task<Result<()>>> {
1039 if !payload.remove_channel_invitations.is_empty() {
1040 self.channel_invitations
1041 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1042 }
1043 for channel in payload.channel_invitations {
1044 match self
1045 .channel_invitations
1046 .binary_search_by_key(&channel.id, |c| c.id.0)
1047 {
1048 Ok(ix) => {
1049 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1050 }
1051 Err(ix) => self.channel_invitations.insert(
1052 ix,
1053 Arc::new(Channel {
1054 id: ChannelId(channel.id),
1055 visibility: channel.visibility(),
1056 name: channel.name.into(),
1057 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1058 }),
1059 ),
1060 }
1061 }
1062
1063 let channels_changed = !payload.channels.is_empty()
1064 || !payload.delete_channels.is_empty()
1065 || !payload.latest_channel_message_ids.is_empty()
1066 || !payload.latest_channel_buffer_versions.is_empty();
1067
1068 if channels_changed {
1069 if !payload.delete_channels.is_empty() {
1070 let delete_channels: Vec<ChannelId> =
1071 payload.delete_channels.into_iter().map(ChannelId).collect();
1072 self.channel_index.delete_channels(&delete_channels);
1073 self.channel_participants
1074 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1075
1076 for channel_id in &delete_channels {
1077 let channel_id = *channel_id;
1078 if payload
1079 .channels
1080 .iter()
1081 .any(|channel| channel.id == channel_id.0)
1082 {
1083 continue;
1084 }
1085 if let Some(OpenEntityHandle::Open(buffer)) =
1086 self.opened_buffers.remove(&channel_id)
1087 {
1088 if let Some(buffer) = buffer.upgrade() {
1089 buffer.update(cx, ChannelBuffer::disconnect);
1090 }
1091 }
1092 }
1093 }
1094
1095 let mut index = self.channel_index.bulk_insert();
1096 for channel in payload.channels {
1097 let id = ChannelId(channel.id);
1098 let channel_changed = index.insert(channel);
1099
1100 if channel_changed {
1101 if let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1102 if let Some(buffer) = buffer.upgrade() {
1103 buffer.update(cx, ChannelBuffer::channel_changed);
1104 }
1105 }
1106 }
1107 }
1108
1109 for latest_buffer_version in payload.latest_channel_buffer_versions {
1110 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1111 self.channel_states
1112 .entry(ChannelId(latest_buffer_version.channel_id))
1113 .or_default()
1114 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1115 }
1116
1117 for latest_channel_message in payload.latest_channel_message_ids {
1118 self.channel_states
1119 .entry(ChannelId(latest_channel_message.channel_id))
1120 .or_default()
1121 .update_latest_message_id(latest_channel_message.message_id);
1122 }
1123 }
1124
1125 cx.notify();
1126 if payload.channel_participants.is_empty() {
1127 return None;
1128 }
1129
1130 let mut all_user_ids = Vec::new();
1131 let channel_participants = payload.channel_participants;
1132 for entry in &channel_participants {
1133 for user_id in entry.participant_user_ids.iter() {
1134 if let Err(ix) = all_user_ids.binary_search(user_id) {
1135 all_user_ids.insert(ix, *user_id);
1136 }
1137 }
1138 }
1139
1140 let users = self
1141 .user_store
1142 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1143 Some(cx.spawn(|this, mut cx| async move {
1144 let users = users.await?;
1145
1146 this.update(&mut cx, |this, cx| {
1147 for entry in &channel_participants {
1148 let mut participants: Vec<_> = entry
1149 .participant_user_ids
1150 .iter()
1151 .filter_map(|user_id| {
1152 users
1153 .binary_search_by_key(&user_id, |user| &user.id)
1154 .ok()
1155 .map(|ix| users[ix].clone())
1156 })
1157 .collect();
1158
1159 participants.sort_by_key(|u| u.id);
1160
1161 this.channel_participants
1162 .insert(ChannelId(entry.channel_id), participants);
1163 }
1164
1165 cx.notify();
1166 })
1167 }))
1168 }
1169}
1170
1171impl ChannelState {
1172 fn set_role(&mut self, role: ChannelRole) {
1173 self.role = Some(role);
1174 }
1175
1176 fn has_channel_buffer_changed(&self) -> bool {
1177 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1178 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1179 && self
1180 .latest_notes_version
1181 .version
1182 .changed_since(&self.observed_notes_version.version))
1183 }
1184
1185 fn has_new_messages(&self) -> bool {
1186 let latest_message_id = self.latest_chat_message;
1187 let observed_message_id = self.observed_chat_message;
1188
1189 latest_message_id.is_some_and(|latest_message_id| {
1190 latest_message_id > observed_message_id.unwrap_or_default()
1191 })
1192 }
1193
1194 fn last_acknowledged_message_id(&self) -> Option<u64> {
1195 self.observed_chat_message
1196 }
1197
1198 fn acknowledge_message_id(&mut self, message_id: u64) {
1199 let observed = self.observed_chat_message.get_or_insert(message_id);
1200 *observed = (*observed).max(message_id);
1201 }
1202
1203 fn update_latest_message_id(&mut self, message_id: u64) {
1204 self.latest_chat_message =
1205 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1206 }
1207
1208 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1209 if self.observed_notes_version.epoch == epoch {
1210 self.observed_notes_version.version.join(version);
1211 } else {
1212 self.observed_notes_version = NotesVersion {
1213 epoch,
1214 version: version.clone(),
1215 };
1216 }
1217 }
1218
1219 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1220 if self.latest_notes_version.epoch == epoch {
1221 self.latest_notes_version.version.join(version);
1222 } else {
1223 self.latest_notes_version = NotesVersion {
1224 epoch,
1225 version: version.clone(),
1226 };
1227 }
1228 }
1229}