1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
11 WeakEntity,
12};
13use language::Capability;
14use rpc::{
15 proto::{self, ChannelRole, ChannelVisibility},
16 TypedEnvelope,
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{maybe, ResultExt};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
25 let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
26 cx.set_global(GlobalChannelStore(channel_store));
27}
28
29#[derive(Debug, Clone, Default, PartialEq)]
30struct NotesVersion {
31 epoch: u64,
32 version: clock::Global,
33}
34
35pub struct ChannelStore {
36 pub channel_index: ChannelIndex,
37 channel_invitations: Vec<Arc<Channel>>,
38 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
39 channel_states: HashMap<ChannelId, ChannelState>,
40 outgoing_invites: HashSet<(ChannelId, UserId)>,
41 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
42 opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
43 opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
44 client: Arc<Client>,
45 did_subscribe: bool,
46 user_store: Entity<UserStore>,
47 _rpc_subscriptions: [Subscription; 2],
48 _watch_connection_status: Task<Option<()>>,
49 disconnect_channel_buffers_task: Option<Task<()>>,
50 _update_channels: Task<()>,
51}
52
53#[derive(Clone, Debug)]
54pub struct Channel {
55 pub id: ChannelId,
56 pub name: SharedString,
57 pub visibility: proto::ChannelVisibility,
58 pub parent_path: Vec<ChannelId>,
59}
60
61#[derive(Default, Debug)]
62pub struct ChannelState {
63 latest_chat_message: Option<u64>,
64 latest_notes_version: NotesVersion,
65 observed_notes_version: NotesVersion,
66 observed_chat_message: Option<u64>,
67 role: Option<ChannelRole>,
68}
69
70impl Channel {
71 pub fn link(&self, cx: &App) -> String {
72 format!(
73 "{}/channel/{}-{}",
74 ClientSettings::get_global(cx).server_url,
75 Self::slug(&self.name),
76 self.id
77 )
78 }
79
80 pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
81 self.link(cx)
82 + "/notes"
83 + &heading
84 .map(|h| format!("#{}", Self::slug(&h)))
85 .unwrap_or_default()
86 }
87
88 pub fn is_root_channel(&self) -> bool {
89 self.parent_path.is_empty()
90 }
91
92 pub fn root_id(&self) -> ChannelId {
93 self.parent_path.first().copied().unwrap_or(self.id)
94 }
95
96 pub fn slug(str: &str) -> String {
97 let slug: String = str
98 .chars()
99 .map(|c| if c.is_alphanumeric() { c } else { '-' })
100 .collect();
101
102 slug.trim_matches(|c| c == '-').to_string()
103 }
104}
105
106#[derive(Debug)]
107pub struct ChannelMembership {
108 pub user: Arc<User>,
109 pub kind: proto::channel_member::Kind,
110 pub role: proto::ChannelRole,
111}
112impl ChannelMembership {
113 pub fn sort_key(&self) -> MembershipSortKey {
114 MembershipSortKey {
115 role_order: match self.role {
116 proto::ChannelRole::Admin => 0,
117 proto::ChannelRole::Member => 1,
118 proto::ChannelRole::Banned => 2,
119 proto::ChannelRole::Talker => 3,
120 proto::ChannelRole::Guest => 4,
121 },
122 kind_order: match self.kind {
123 proto::channel_member::Kind::Member => 0,
124 proto::channel_member::Kind::Invitee => 1,
125 },
126 username_order: self.user.github_login.as_str(),
127 }
128 }
129}
130
131#[derive(PartialOrd, Ord, PartialEq, Eq)]
132pub struct MembershipSortKey<'a> {
133 role_order: u8,
134 kind_order: u8,
135 username_order: &'a str,
136}
137
138pub enum ChannelEvent {
139 ChannelCreated(ChannelId),
140 ChannelRenamed(ChannelId),
141}
142
143impl EventEmitter<ChannelEvent> for ChannelStore {}
144
145enum OpenEntityHandle<E> {
146 Open(WeakEntity<E>),
147 Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
148}
149
150struct GlobalChannelStore(Entity<ChannelStore>);
151
152impl Global for GlobalChannelStore {}
153
154impl ChannelStore {
155 pub fn global(cx: &App) -> Entity<Self> {
156 cx.global::<GlobalChannelStore>().0.clone()
157 }
158
159 pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
160 let rpc_subscriptions = [
161 client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
162 client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
163 ];
164
165 let mut connection_status = client.status();
166 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
167 let watch_connection_status = cx.spawn(async move |this, cx| {
168 while let Some(status) = connection_status.next().await {
169 let this = this.upgrade()?;
170 match status {
171 client::Status::Connected { .. } => {
172 this.update(cx, |this, cx| this.handle_connect(cx))
173 .ok()?
174 .await
175 .log_err()?;
176 }
177 client::Status::SignedOut | client::Status::UpgradeRequired => {
178 this.update(cx, |this, cx| this.handle_disconnect(false, cx))
179 .ok();
180 }
181 _ => {
182 this.update(cx, |this, cx| this.handle_disconnect(true, cx))
183 .ok();
184 }
185 }
186 }
187 Some(())
188 });
189
190 Self {
191 channel_invitations: Vec::default(),
192 channel_index: ChannelIndex::default(),
193 channel_participants: Default::default(),
194 outgoing_invites: Default::default(),
195 opened_buffers: Default::default(),
196 opened_chats: Default::default(),
197 update_channels_tx,
198 client,
199 user_store,
200 _rpc_subscriptions: rpc_subscriptions,
201 _watch_connection_status: watch_connection_status,
202 disconnect_channel_buffers_task: None,
203 _update_channels: cx.spawn(async move |this, cx| {
204 maybe!(async move {
205 while let Some(update_channels) = update_channels_rx.next().await {
206 if let Some(this) = this.upgrade() {
207 let update_task = this
208 .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
209 if let Some(update_task) = update_task {
210 update_task.await.log_err();
211 }
212 }
213 }
214 anyhow::Ok(())
215 })
216 .await
217 .log_err();
218 }),
219 channel_states: Default::default(),
220 did_subscribe: false,
221 }
222 }
223
224 pub fn initialize(&mut self) {
225 if !self.did_subscribe
226 && self
227 .client
228 .send(proto::SubscribeToChannels {})
229 .log_err()
230 .is_some()
231 {
232 self.did_subscribe = true;
233 }
234 }
235
236 pub fn client(&self) -> Arc<Client> {
237 self.client.clone()
238 }
239
240 /// Returns the number of unique channels in the store
241 pub fn channel_count(&self) -> usize {
242 self.channel_index.by_id().len()
243 }
244
245 /// Returns the index of a channel ID in the list of unique channels
246 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
247 self.channel_index
248 .by_id()
249 .keys()
250 .position(|id| *id == channel_id)
251 }
252
253 /// Returns an iterator over all unique channels
254 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
255 self.channel_index.by_id().values()
256 }
257
258 /// Iterate over all entries in the channel DAG
259 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
260 self.channel_index
261 .ordered_channels()
262 .iter()
263 .filter_map(move |id| {
264 let channel = self.channel_index.by_id().get(id)?;
265 Some((channel.parent_path.len(), channel))
266 })
267 }
268
269 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
270 let channel_id = self.channel_index.ordered_channels().get(ix)?;
271 self.channel_index.by_id().get(channel_id)
272 }
273
274 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
275 self.channel_index.by_id().values().nth(ix)
276 }
277
278 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
279 self.channel_invitations
280 .iter()
281 .any(|channel| channel.id == channel_id)
282 }
283
284 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
285 &self.channel_invitations
286 }
287
288 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
289 self.channel_index.by_id().get(&channel_id)
290 }
291
292 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
293 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
294 if let OpenEntityHandle::Open(buffer) = buffer {
295 return buffer.upgrade().is_some();
296 }
297 }
298 false
299 }
300
301 pub fn open_channel_buffer(
302 &mut self,
303 channel_id: ChannelId,
304 cx: &mut Context<Self>,
305 ) -> Task<Result<Entity<ChannelBuffer>>> {
306 let client = self.client.clone();
307 let user_store = self.user_store.clone();
308 let channel_store = cx.entity();
309 self.open_channel_resource(
310 channel_id,
311 |this| &mut this.opened_buffers,
312 async move |channel, cx| {
313 ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
314 },
315 cx,
316 )
317 }
318
319 pub fn fetch_channel_messages(
320 &self,
321 message_ids: Vec<u64>,
322 cx: &mut Context<Self>,
323 ) -> Task<Result<Vec<ChannelMessage>>> {
324 let request = if message_ids.is_empty() {
325 None
326 } else {
327 Some(
328 self.client
329 .request(proto::GetChannelMessagesById { message_ids }),
330 )
331 };
332 cx.spawn(async move |this, cx| {
333 if let Some(request) = request {
334 let response = request.await?;
335 let this = this
336 .upgrade()
337 .ok_or_else(|| anyhow!("channel store dropped"))?;
338 let user_store = this.update(cx, |this, _| this.user_store.clone())?;
339 ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
340 } else {
341 Ok(Vec::new())
342 }
343 })
344 }
345
346 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
347 self.channel_states
348 .get(&channel_id)
349 .is_some_and(|state| state.has_channel_buffer_changed())
350 }
351
352 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
353 self.channel_states
354 .get(&channel_id)
355 .is_some_and(|state| state.has_new_messages())
356 }
357
358 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
359 if let Some(state) = self.channel_states.get_mut(&channel_id) {
360 state.latest_chat_message = message_id;
361 }
362 }
363
364 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
365 self.channel_states.get(&channel_id).and_then(|state| {
366 if let Some(last_message_id) = state.latest_chat_message {
367 if state
368 .last_acknowledged_message_id()
369 .is_some_and(|id| id < last_message_id)
370 {
371 return state.last_acknowledged_message_id();
372 }
373 }
374
375 None
376 })
377 }
378
379 pub fn acknowledge_message_id(
380 &mut self,
381 channel_id: ChannelId,
382 message_id: u64,
383 cx: &mut Context<Self>,
384 ) {
385 self.channel_states
386 .entry(channel_id)
387 .or_default()
388 .acknowledge_message_id(message_id);
389 cx.notify();
390 }
391
392 pub fn update_latest_message_id(
393 &mut self,
394 channel_id: ChannelId,
395 message_id: u64,
396 cx: &mut Context<Self>,
397 ) {
398 self.channel_states
399 .entry(channel_id)
400 .or_default()
401 .update_latest_message_id(message_id);
402 cx.notify();
403 }
404
405 pub fn acknowledge_notes_version(
406 &mut self,
407 channel_id: ChannelId,
408 epoch: u64,
409 version: &clock::Global,
410 cx: &mut Context<Self>,
411 ) {
412 self.channel_states
413 .entry(channel_id)
414 .or_default()
415 .acknowledge_notes_version(epoch, version);
416 cx.notify()
417 }
418
419 pub fn update_latest_notes_version(
420 &mut self,
421 channel_id: ChannelId,
422 epoch: u64,
423 version: &clock::Global,
424 cx: &mut Context<Self>,
425 ) {
426 self.channel_states
427 .entry(channel_id)
428 .or_default()
429 .update_latest_notes_version(epoch, version);
430 cx.notify()
431 }
432
433 pub fn open_channel_chat(
434 &mut self,
435 channel_id: ChannelId,
436 cx: &mut Context<Self>,
437 ) -> Task<Result<Entity<ChannelChat>>> {
438 let client = self.client.clone();
439 let user_store = self.user_store.clone();
440 let this = cx.entity();
441 self.open_channel_resource(
442 channel_id,
443 |this| &mut this.opened_chats,
444 async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
445 cx,
446 )
447 }
448
449 /// Asynchronously open a given resource associated with a channel.
450 ///
451 /// Make sure that the resource is only opened once, even if this method
452 /// is called multiple times with the same channel id while the first task
453 /// is still running.
454 fn open_channel_resource<T, F>(
455 &mut self,
456 channel_id: ChannelId,
457 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
458 load: F,
459 cx: &mut Context<Self>,
460 ) -> Task<Result<Entity<T>>>
461 where
462 F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
463 T: 'static,
464 {
465 let task = loop {
466 match get_map(self).entry(channel_id) {
467 hash_map::Entry::Occupied(e) => match e.get() {
468 OpenEntityHandle::Open(entity) => {
469 if let Some(entity) = entity.upgrade() {
470 break Task::ready(Ok(entity)).shared();
471 } else {
472 get_map(self).remove(&channel_id);
473 continue;
474 }
475 }
476 OpenEntityHandle::Loading(task) => {
477 break task.clone();
478 }
479 },
480 hash_map::Entry::Vacant(e) => {
481 let task = cx
482 .spawn(async move |this, cx| {
483 let channel = this.update(cx, |this, _| {
484 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
485 Arc::new(anyhow!("no channel for id: {}", channel_id))
486 })
487 })??;
488
489 load(channel, cx).await.map_err(Arc::new)
490 })
491 .shared();
492
493 e.insert(OpenEntityHandle::Loading(task.clone()));
494 cx.spawn({
495 let task = task.clone();
496 async move |this, cx| {
497 let result = task.await;
498 this.update(cx, |this, _| match result {
499 Ok(model) => {
500 get_map(this).insert(
501 channel_id,
502 OpenEntityHandle::Open(model.downgrade()),
503 );
504 }
505 Err(_) => {
506 get_map(this).remove(&channel_id);
507 }
508 })
509 .ok();
510 }
511 })
512 .detach();
513 break task;
514 }
515 }
516 };
517 cx.background_spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
518 }
519
520 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
521 self.channel_role(channel_id) == proto::ChannelRole::Admin
522 }
523
524 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
525 self.channel_index
526 .by_id()
527 .get(&channel_id)
528 .map_or(false, |channel| channel.is_root_channel())
529 }
530
531 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
532 self.channel_index
533 .by_id()
534 .get(&channel_id)
535 .map_or(false, |channel| {
536 channel.visibility == ChannelVisibility::Public
537 })
538 }
539
540 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
541 match self.channel_role(channel_id) {
542 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
543 _ => Capability::ReadOnly,
544 }
545 }
546
547 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
548 maybe!({
549 let mut channel = self.channel_for_id(channel_id)?;
550 if !channel.is_root_channel() {
551 channel = self.channel_for_id(channel.root_id())?;
552 }
553 let root_channel_state = self.channel_states.get(&channel.id);
554 root_channel_state?.role
555 })
556 .unwrap_or(proto::ChannelRole::Guest)
557 }
558
559 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
560 self.channel_participants
561 .get(&channel_id)
562 .map_or(&[], |v| v.as_slice())
563 }
564
565 pub fn create_channel(
566 &self,
567 name: &str,
568 parent_id: Option<ChannelId>,
569 cx: &mut Context<Self>,
570 ) -> Task<Result<ChannelId>> {
571 let client = self.client.clone();
572 let name = name.trim_start_matches('#').to_owned();
573 cx.spawn(async move |this, cx| {
574 let response = client
575 .request(proto::CreateChannel {
576 name,
577 parent_id: parent_id.map(|cid| cid.0),
578 })
579 .await?;
580
581 let channel = response
582 .channel
583 .ok_or_else(|| anyhow!("missing channel in response"))?;
584 let channel_id = ChannelId(channel.id);
585
586 this.update(cx, |this, cx| {
587 let task = this.update_channels(
588 proto::UpdateChannels {
589 channels: vec![channel],
590 ..Default::default()
591 },
592 cx,
593 );
594 assert!(task.is_none());
595
596 // This event is emitted because the collab panel wants to clear the pending edit state
597 // before this frame is rendered. But we can't guarantee that the collab panel's future
598 // will resolve before this flush_effects finishes. Synchronously emitting this event
599 // ensures that the collab panel will observe this creation before the frame completes
600 cx.emit(ChannelEvent::ChannelCreated(channel_id));
601 })?;
602
603 Ok(channel_id)
604 })
605 }
606
607 pub fn move_channel(
608 &mut self,
609 channel_id: ChannelId,
610 to: ChannelId,
611 cx: &mut Context<Self>,
612 ) -> Task<Result<()>> {
613 let client = self.client.clone();
614 cx.spawn(async move |_, _| {
615 let _ = client
616 .request(proto::MoveChannel {
617 channel_id: channel_id.0,
618 to: to.0,
619 })
620 .await?;
621
622 Ok(())
623 })
624 }
625
626 pub fn set_channel_visibility(
627 &mut self,
628 channel_id: ChannelId,
629 visibility: ChannelVisibility,
630 cx: &mut Context<Self>,
631 ) -> Task<Result<()>> {
632 let client = self.client.clone();
633 cx.spawn(async move |_, _| {
634 let _ = client
635 .request(proto::SetChannelVisibility {
636 channel_id: channel_id.0,
637 visibility: visibility.into(),
638 })
639 .await?;
640
641 Ok(())
642 })
643 }
644
645 pub fn invite_member(
646 &mut self,
647 channel_id: ChannelId,
648 user_id: UserId,
649 role: proto::ChannelRole,
650 cx: &mut Context<Self>,
651 ) -> Task<Result<()>> {
652 if !self.outgoing_invites.insert((channel_id, user_id)) {
653 return Task::ready(Err(anyhow!("invite request already in progress")));
654 }
655
656 cx.notify();
657 let client = self.client.clone();
658 cx.spawn(async move |this, cx| {
659 let result = client
660 .request(proto::InviteChannelMember {
661 channel_id: channel_id.0,
662 user_id,
663 role: role.into(),
664 })
665 .await;
666
667 this.update(cx, |this, cx| {
668 this.outgoing_invites.remove(&(channel_id, user_id));
669 cx.notify();
670 })?;
671
672 result?;
673
674 Ok(())
675 })
676 }
677
678 pub fn remove_member(
679 &mut self,
680 channel_id: ChannelId,
681 user_id: u64,
682 cx: &mut Context<Self>,
683 ) -> Task<Result<()>> {
684 if !self.outgoing_invites.insert((channel_id, user_id)) {
685 return Task::ready(Err(anyhow!("invite request already in progress")));
686 }
687
688 cx.notify();
689 let client = self.client.clone();
690 cx.spawn(async move |this, cx| {
691 let result = client
692 .request(proto::RemoveChannelMember {
693 channel_id: channel_id.0,
694 user_id,
695 })
696 .await;
697
698 this.update(cx, |this, cx| {
699 this.outgoing_invites.remove(&(channel_id, user_id));
700 cx.notify();
701 })?;
702 result?;
703 Ok(())
704 })
705 }
706
707 pub fn set_member_role(
708 &mut self,
709 channel_id: ChannelId,
710 user_id: UserId,
711 role: proto::ChannelRole,
712 cx: &mut Context<Self>,
713 ) -> Task<Result<()>> {
714 if !self.outgoing_invites.insert((channel_id, user_id)) {
715 return Task::ready(Err(anyhow!("member request already in progress")));
716 }
717
718 cx.notify();
719 let client = self.client.clone();
720 cx.spawn(async move |this, cx| {
721 let result = client
722 .request(proto::SetChannelMemberRole {
723 channel_id: channel_id.0,
724 user_id,
725 role: role.into(),
726 })
727 .await;
728
729 this.update(cx, |this, cx| {
730 this.outgoing_invites.remove(&(channel_id, user_id));
731 cx.notify();
732 })?;
733
734 result?;
735 Ok(())
736 })
737 }
738
739 pub fn rename(
740 &mut self,
741 channel_id: ChannelId,
742 new_name: &str,
743 cx: &mut Context<Self>,
744 ) -> Task<Result<()>> {
745 let client = self.client.clone();
746 let name = new_name.to_string();
747 cx.spawn(async move |this, cx| {
748 let channel = client
749 .request(proto::RenameChannel {
750 channel_id: channel_id.0,
751 name,
752 })
753 .await?
754 .channel
755 .ok_or_else(|| anyhow!("missing channel in response"))?;
756 this.update(cx, |this, cx| {
757 let task = this.update_channels(
758 proto::UpdateChannels {
759 channels: vec![channel],
760 ..Default::default()
761 },
762 cx,
763 );
764 assert!(task.is_none());
765
766 // This event is emitted because the collab panel wants to clear the pending edit state
767 // before this frame is rendered. But we can't guarantee that the collab panel's future
768 // will resolve before this flush_effects finishes. Synchronously emitting this event
769 // ensures that the collab panel will observe this creation before the frame complete
770 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
771 })?;
772 Ok(())
773 })
774 }
775
776 pub fn respond_to_channel_invite(
777 &mut self,
778 channel_id: ChannelId,
779 accept: bool,
780 cx: &mut Context<Self>,
781 ) -> Task<Result<()>> {
782 let client = self.client.clone();
783 cx.background_spawn(async move {
784 client
785 .request(proto::RespondToChannelInvite {
786 channel_id: channel_id.0,
787 accept,
788 })
789 .await?;
790 Ok(())
791 })
792 }
793 pub fn fuzzy_search_members(
794 &self,
795 channel_id: ChannelId,
796 query: String,
797 limit: u16,
798 cx: &mut Context<Self>,
799 ) -> Task<Result<Vec<ChannelMembership>>> {
800 let client = self.client.clone();
801 let user_store = self.user_store.downgrade();
802 cx.spawn(async move |_, cx| {
803 let response = client
804 .request(proto::GetChannelMembers {
805 channel_id: channel_id.0,
806 query,
807 limit: limit as u64,
808 })
809 .await?;
810 user_store.update(cx, |user_store, _| {
811 user_store.insert(response.users);
812 response
813 .members
814 .into_iter()
815 .filter_map(|member| {
816 Some(ChannelMembership {
817 user: user_store.get_cached_user(member.user_id)?,
818 role: member.role(),
819 kind: member.kind(),
820 })
821 })
822 .collect()
823 })
824 })
825 }
826
827 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
828 let client = self.client.clone();
829 async move {
830 client
831 .request(proto::DeleteChannel {
832 channel_id: channel_id.0,
833 })
834 .await?;
835 Ok(())
836 }
837 }
838
839 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
840 false
841 }
842
843 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
844 self.outgoing_invites.contains(&(channel_id, user_id))
845 }
846
847 async fn handle_update_channels(
848 this: Entity<Self>,
849 message: TypedEnvelope<proto::UpdateChannels>,
850 mut cx: AsyncApp,
851 ) -> Result<()> {
852 this.update(&mut cx, |this, _| {
853 this.update_channels_tx
854 .unbounded_send(message.payload)
855 .unwrap();
856 })?;
857 Ok(())
858 }
859
860 async fn handle_update_user_channels(
861 this: Entity<Self>,
862 message: TypedEnvelope<proto::UpdateUserChannels>,
863 mut cx: AsyncApp,
864 ) -> Result<()> {
865 this.update(&mut cx, |this, cx| {
866 for buffer_version in message.payload.observed_channel_buffer_version {
867 let version = language::proto::deserialize_version(&buffer_version.version);
868 this.acknowledge_notes_version(
869 ChannelId(buffer_version.channel_id),
870 buffer_version.epoch,
871 &version,
872 cx,
873 );
874 }
875 for message_id in message.payload.observed_channel_message_id {
876 this.acknowledge_message_id(
877 ChannelId(message_id.channel_id),
878 message_id.message_id,
879 cx,
880 );
881 }
882 for membership in message.payload.channel_memberships {
883 if let Some(role) = ChannelRole::from_i32(membership.role) {
884 this.channel_states
885 .entry(ChannelId(membership.channel_id))
886 .or_default()
887 .set_role(role)
888 }
889 }
890 })
891 }
892
893 fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
894 self.channel_index.clear();
895 self.channel_invitations.clear();
896 self.channel_participants.clear();
897 self.channel_index.clear();
898 self.outgoing_invites.clear();
899 self.disconnect_channel_buffers_task.take();
900
901 for chat in self.opened_chats.values() {
902 if let OpenEntityHandle::Open(chat) = chat {
903 if let Some(chat) = chat.upgrade() {
904 chat.update(cx, |chat, cx| {
905 chat.rejoin(cx);
906 });
907 }
908 }
909 }
910
911 let mut buffer_versions = Vec::new();
912 for buffer in self.opened_buffers.values() {
913 if let OpenEntityHandle::Open(buffer) = buffer {
914 if let Some(buffer) = buffer.upgrade() {
915 let channel_buffer = buffer.read(cx);
916 let buffer = channel_buffer.buffer().read(cx);
917 buffer_versions.push(proto::ChannelBufferVersion {
918 channel_id: channel_buffer.channel_id.0,
919 epoch: channel_buffer.epoch(),
920 version: language::proto::serialize_version(&buffer.version()),
921 });
922 }
923 }
924 }
925
926 if buffer_versions.is_empty() {
927 return Task::ready(Ok(()));
928 }
929
930 let response = self.client.request(proto::RejoinChannelBuffers {
931 buffers: buffer_versions,
932 });
933
934 cx.spawn(async move |this, cx| {
935 let mut response = response.await?;
936
937 this.update(cx, |this, cx| {
938 this.opened_buffers.retain(|_, buffer| match buffer {
939 OpenEntityHandle::Open(channel_buffer) => {
940 let Some(channel_buffer) = channel_buffer.upgrade() else {
941 return false;
942 };
943
944 channel_buffer.update(cx, |channel_buffer, cx| {
945 let channel_id = channel_buffer.channel_id;
946 if let Some(remote_buffer) = response
947 .buffers
948 .iter_mut()
949 .find(|buffer| buffer.channel_id == channel_id.0)
950 {
951 let channel_id = channel_buffer.channel_id;
952 let remote_version =
953 language::proto::deserialize_version(&remote_buffer.version);
954
955 channel_buffer.replace_collaborators(
956 mem::take(&mut remote_buffer.collaborators),
957 cx,
958 );
959
960 let operations = channel_buffer
961 .buffer()
962 .update(cx, |buffer, cx| {
963 let outgoing_operations =
964 buffer.serialize_ops(Some(remote_version), cx);
965 let incoming_operations =
966 mem::take(&mut remote_buffer.operations)
967 .into_iter()
968 .map(language::proto::deserialize_operation)
969 .collect::<Result<Vec<_>>>()?;
970 buffer.apply_ops(incoming_operations, cx);
971 anyhow::Ok(outgoing_operations)
972 })
973 .log_err();
974
975 if let Some(operations) = operations {
976 let client = this.client.clone();
977 cx.background_spawn(async move {
978 let operations = operations.await;
979 for chunk in language::proto::split_operations(operations) {
980 client
981 .send(proto::UpdateChannelBuffer {
982 channel_id: channel_id.0,
983 operations: chunk,
984 })
985 .ok();
986 }
987 })
988 .detach();
989 return true;
990 }
991 }
992
993 channel_buffer.disconnect(cx);
994 false
995 })
996 }
997 OpenEntityHandle::Loading(_) => true,
998 });
999 })
1000 .ok();
1001 anyhow::Ok(())
1002 })
1003 }
1004
1005 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1006 cx.notify();
1007 self.did_subscribe = false;
1008 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1009 cx.spawn(async move |this, cx| {
1010 if wait_for_reconnect {
1011 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1012 }
1013
1014 if let Some(this) = this.upgrade() {
1015 this.update(cx, |this, cx| {
1016 for (_, buffer) in this.opened_buffers.drain() {
1017 if let OpenEntityHandle::Open(buffer) = buffer {
1018 if let Some(buffer) = buffer.upgrade() {
1019 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1020 }
1021 }
1022 }
1023 })
1024 .ok();
1025 }
1026 })
1027 });
1028 }
1029
1030 pub(crate) fn update_channels(
1031 &mut self,
1032 payload: proto::UpdateChannels,
1033 cx: &mut Context<ChannelStore>,
1034 ) -> Option<Task<Result<()>>> {
1035 if !payload.remove_channel_invitations.is_empty() {
1036 self.channel_invitations
1037 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1038 }
1039 for channel in payload.channel_invitations {
1040 match self
1041 .channel_invitations
1042 .binary_search_by_key(&channel.id, |c| c.id.0)
1043 {
1044 Ok(ix) => {
1045 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1046 }
1047 Err(ix) => self.channel_invitations.insert(
1048 ix,
1049 Arc::new(Channel {
1050 id: ChannelId(channel.id),
1051 visibility: channel.visibility(),
1052 name: channel.name.into(),
1053 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1054 }),
1055 ),
1056 }
1057 }
1058
1059 let channels_changed = !payload.channels.is_empty()
1060 || !payload.delete_channels.is_empty()
1061 || !payload.latest_channel_message_ids.is_empty()
1062 || !payload.latest_channel_buffer_versions.is_empty();
1063
1064 if channels_changed {
1065 if !payload.delete_channels.is_empty() {
1066 let delete_channels: Vec<ChannelId> =
1067 payload.delete_channels.into_iter().map(ChannelId).collect();
1068 self.channel_index.delete_channels(&delete_channels);
1069 self.channel_participants
1070 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1071
1072 for channel_id in &delete_channels {
1073 let channel_id = *channel_id;
1074 if payload
1075 .channels
1076 .iter()
1077 .any(|channel| channel.id == channel_id.0)
1078 {
1079 continue;
1080 }
1081 if let Some(OpenEntityHandle::Open(buffer)) =
1082 self.opened_buffers.remove(&channel_id)
1083 {
1084 if let Some(buffer) = buffer.upgrade() {
1085 buffer.update(cx, ChannelBuffer::disconnect);
1086 }
1087 }
1088 }
1089 }
1090
1091 let mut index = self.channel_index.bulk_insert();
1092 for channel in payload.channels {
1093 let id = ChannelId(channel.id);
1094 let channel_changed = index.insert(channel);
1095
1096 if channel_changed {
1097 if let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1098 if let Some(buffer) = buffer.upgrade() {
1099 buffer.update(cx, ChannelBuffer::channel_changed);
1100 }
1101 }
1102 }
1103 }
1104
1105 for latest_buffer_version in payload.latest_channel_buffer_versions {
1106 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1107 self.channel_states
1108 .entry(ChannelId(latest_buffer_version.channel_id))
1109 .or_default()
1110 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1111 }
1112
1113 for latest_channel_message in payload.latest_channel_message_ids {
1114 self.channel_states
1115 .entry(ChannelId(latest_channel_message.channel_id))
1116 .or_default()
1117 .update_latest_message_id(latest_channel_message.message_id);
1118 }
1119 }
1120
1121 cx.notify();
1122 if payload.channel_participants.is_empty() {
1123 return None;
1124 }
1125
1126 let mut all_user_ids = Vec::new();
1127 let channel_participants = payload.channel_participants;
1128 for entry in &channel_participants {
1129 for user_id in entry.participant_user_ids.iter() {
1130 if let Err(ix) = all_user_ids.binary_search(user_id) {
1131 all_user_ids.insert(ix, *user_id);
1132 }
1133 }
1134 }
1135
1136 let users = self
1137 .user_store
1138 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1139 Some(cx.spawn(async move |this, cx| {
1140 let users = users.await?;
1141
1142 this.update(cx, |this, cx| {
1143 for entry in &channel_participants {
1144 let mut participants: Vec<_> = entry
1145 .participant_user_ids
1146 .iter()
1147 .filter_map(|user_id| {
1148 users
1149 .binary_search_by_key(&user_id, |user| &user.id)
1150 .ok()
1151 .map(|ix| users[ix].clone())
1152 })
1153 .collect();
1154
1155 participants.sort_by_key(|u| u.id);
1156
1157 this.channel_participants
1158 .insert(ChannelId(entry.channel_id), participants);
1159 }
1160
1161 cx.notify();
1162 })
1163 }))
1164 }
1165}
1166
1167impl ChannelState {
1168 fn set_role(&mut self, role: ChannelRole) {
1169 self.role = Some(role);
1170 }
1171
1172 fn has_channel_buffer_changed(&self) -> bool {
1173 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1174 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1175 && self
1176 .latest_notes_version
1177 .version
1178 .changed_since(&self.observed_notes_version.version))
1179 }
1180
1181 fn has_new_messages(&self) -> bool {
1182 let latest_message_id = self.latest_chat_message;
1183 let observed_message_id = self.observed_chat_message;
1184
1185 latest_message_id.is_some_and(|latest_message_id| {
1186 latest_message_id > observed_message_id.unwrap_or_default()
1187 })
1188 }
1189
1190 fn last_acknowledged_message_id(&self) -> Option<u64> {
1191 self.observed_chat_message
1192 }
1193
1194 fn acknowledge_message_id(&mut self, message_id: u64) {
1195 let observed = self.observed_chat_message.get_or_insert(message_id);
1196 *observed = (*observed).max(message_id);
1197 }
1198
1199 fn update_latest_message_id(&mut self, message_id: u64) {
1200 self.latest_chat_message =
1201 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1202 }
1203
1204 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1205 if self.observed_notes_version.epoch == epoch {
1206 self.observed_notes_version.version.join(version);
1207 } else {
1208 self.observed_notes_version = NotesVersion {
1209 epoch,
1210 version: version.clone(),
1211 };
1212 }
1213 }
1214
1215 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1216 if self.latest_notes_version.epoch == epoch {
1217 self.latest_notes_version.version.join(version);
1218 } else {
1219 self.latest_notes_version = NotesVersion {
1220 epoch,
1221 version: version.clone(),
1222 };
1223 }
1224 }
1225}