1mod channel_index;
2
3use crate::{ChannelMessage, channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
4use anyhow::{Result, anyhow};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{HashMap, HashSet, hash_map};
8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
11 WeakEntity,
12};
13use language::Capability;
14use rpc::{
15 TypedEnvelope,
16 proto::{self, ChannelRole, ChannelVisibility},
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{ResultExt, maybe};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
25 let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
26 cx.set_global(GlobalChannelStore(channel_store));
27}
28
29#[derive(Debug, Clone, Default, PartialEq)]
30struct NotesVersion {
31 epoch: u64,
32 version: clock::Global,
33}
34
35pub struct ChannelStore {
36 pub channel_index: ChannelIndex,
37 channel_invitations: Vec<Arc<Channel>>,
38 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
39 channel_states: HashMap<ChannelId, ChannelState>,
40 outgoing_invites: HashSet<(ChannelId, UserId)>,
41 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
42 opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
43 opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
44 client: Arc<Client>,
45 did_subscribe: bool,
46 user_store: Entity<UserStore>,
47 _rpc_subscriptions: [Subscription; 2],
48 _watch_connection_status: Task<Option<()>>,
49 disconnect_channel_buffers_task: Option<Task<()>>,
50 _update_channels: Task<()>,
51}
52
53#[derive(Clone, Debug)]
54pub struct Channel {
55 pub id: ChannelId,
56 pub name: SharedString,
57 pub visibility: proto::ChannelVisibility,
58 pub parent_path: Vec<ChannelId>,
59}
60
61#[derive(Default, Debug)]
62pub struct ChannelState {
63 latest_chat_message: Option<u64>,
64 latest_notes_version: NotesVersion,
65 observed_notes_version: NotesVersion,
66 observed_chat_message: Option<u64>,
67 role: Option<ChannelRole>,
68}
69
70impl Channel {
71 pub fn link(&self, cx: &App) -> String {
72 format!(
73 "{}/channel/{}-{}",
74 ClientSettings::get_global(cx).server_url,
75 Self::slug(&self.name),
76 self.id
77 )
78 }
79
80 pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
81 self.link(cx)
82 + "/notes"
83 + &heading
84 .map(|h| format!("#{}", Self::slug(&h)))
85 .unwrap_or_default()
86 }
87
88 pub fn is_root_channel(&self) -> bool {
89 self.parent_path.is_empty()
90 }
91
92 pub fn root_id(&self) -> ChannelId {
93 self.parent_path.first().copied().unwrap_or(self.id)
94 }
95
96 pub fn slug(str: &str) -> String {
97 let slug: String = str
98 .chars()
99 .map(|c| if c.is_alphanumeric() { c } else { '-' })
100 .collect();
101
102 slug.trim_matches(|c| c == '-').to_string()
103 }
104}
105
106#[derive(Debug)]
107pub struct ChannelMembership {
108 pub user: Arc<User>,
109 pub kind: proto::channel_member::Kind,
110 pub role: proto::ChannelRole,
111}
112impl ChannelMembership {
113 pub fn sort_key(&self) -> MembershipSortKey {
114 MembershipSortKey {
115 role_order: match self.role {
116 proto::ChannelRole::Admin => 0,
117 proto::ChannelRole::Member => 1,
118 proto::ChannelRole::Banned => 2,
119 proto::ChannelRole::Talker => 3,
120 proto::ChannelRole::Guest => 4,
121 },
122 kind_order: match self.kind {
123 proto::channel_member::Kind::Member => 0,
124 proto::channel_member::Kind::Invitee => 1,
125 },
126 username_order: self.user.github_login.as_str(),
127 }
128 }
129}
130
131#[derive(PartialOrd, Ord, PartialEq, Eq)]
132pub struct MembershipSortKey<'a> {
133 role_order: u8,
134 kind_order: u8,
135 username_order: &'a str,
136}
137
138pub enum ChannelEvent {
139 ChannelCreated(ChannelId),
140 ChannelRenamed(ChannelId),
141}
142
143impl EventEmitter<ChannelEvent> for ChannelStore {}
144
145enum OpenEntityHandle<E> {
146 Open(WeakEntity<E>),
147 Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
148}
149
150struct GlobalChannelStore(Entity<ChannelStore>);
151
152impl Global for GlobalChannelStore {}
153
154impl ChannelStore {
155 pub fn global(cx: &App) -> Entity<Self> {
156 cx.global::<GlobalChannelStore>().0.clone()
157 }
158
159 pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
160 let rpc_subscriptions = [
161 client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
162 client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
163 ];
164
165 let mut connection_status = client.status();
166 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
167 let watch_connection_status = cx.spawn(async move |this, cx| {
168 while let Some(status) = connection_status.next().await {
169 let this = this.upgrade()?;
170 match status {
171 client::Status::Connected { .. } => {
172 this.update(cx, |this, cx| this.handle_connect(cx))
173 .ok()?
174 .await
175 .log_err()?;
176 }
177 client::Status::SignedOut | client::Status::UpgradeRequired => {
178 this.update(cx, |this, cx| this.handle_disconnect(false, cx))
179 .ok();
180 }
181 _ => {
182 this.update(cx, |this, cx| this.handle_disconnect(true, cx))
183 .ok();
184 }
185 }
186 }
187 Some(())
188 });
189
190 Self {
191 channel_invitations: Vec::default(),
192 channel_index: ChannelIndex::default(),
193 channel_participants: Default::default(),
194 outgoing_invites: Default::default(),
195 opened_buffers: Default::default(),
196 opened_chats: Default::default(),
197 update_channels_tx,
198 client,
199 user_store,
200 _rpc_subscriptions: rpc_subscriptions,
201 _watch_connection_status: watch_connection_status,
202 disconnect_channel_buffers_task: None,
203 _update_channels: cx.spawn(async move |this, cx| {
204 maybe!(async move {
205 while let Some(update_channels) = update_channels_rx.next().await {
206 if let Some(this) = this.upgrade() {
207 let update_task = this
208 .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
209 if let Some(update_task) = update_task {
210 update_task.await.log_err();
211 }
212 }
213 }
214 anyhow::Ok(())
215 })
216 .await
217 .log_err();
218 }),
219 channel_states: Default::default(),
220 did_subscribe: false,
221 }
222 }
223
224 pub fn initialize(&mut self) {
225 if !self.did_subscribe
226 && self
227 .client
228 .send(proto::SubscribeToChannels {})
229 .log_err()
230 .is_some()
231 {
232 self.did_subscribe = true;
233 }
234 }
235
236 pub fn client(&self) -> Arc<Client> {
237 self.client.clone()
238 }
239
240 /// Returns the number of unique channels in the store
241 pub fn channel_count(&self) -> usize {
242 self.channel_index.by_id().len()
243 }
244
245 /// Returns the index of a channel ID in the list of unique channels
246 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
247 self.channel_index
248 .by_id()
249 .keys()
250 .position(|id| *id == channel_id)
251 }
252
253 /// Returns an iterator over all unique channels
254 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
255 self.channel_index.by_id().values()
256 }
257
258 /// Iterate over all entries in the channel DAG
259 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
260 self.channel_index
261 .ordered_channels()
262 .iter()
263 .filter_map(move |id| {
264 let channel = self.channel_index.by_id().get(id)?;
265 Some((channel.parent_path.len(), channel))
266 })
267 }
268
269 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
270 let channel_id = self.channel_index.ordered_channels().get(ix)?;
271 self.channel_index.by_id().get(channel_id)
272 }
273
274 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
275 self.channel_index.by_id().values().nth(ix)
276 }
277
278 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
279 self.channel_invitations
280 .iter()
281 .any(|channel| channel.id == channel_id)
282 }
283
284 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
285 &self.channel_invitations
286 }
287
288 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
289 self.channel_index.by_id().get(&channel_id)
290 }
291
292 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
293 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
294 if let OpenEntityHandle::Open(buffer) = buffer {
295 return buffer.upgrade().is_some();
296 }
297 }
298 false
299 }
300
301 pub fn open_channel_buffer(
302 &mut self,
303 channel_id: ChannelId,
304 cx: &mut Context<Self>,
305 ) -> Task<Result<Entity<ChannelBuffer>>> {
306 let client = self.client.clone();
307 let user_store = self.user_store.clone();
308 let channel_store = cx.entity();
309 self.open_channel_resource(
310 channel_id,
311 |this| &mut this.opened_buffers,
312 async move |channel, cx| {
313 ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
314 },
315 cx,
316 )
317 }
318
319 pub fn fetch_channel_messages(
320 &self,
321 message_ids: Vec<u64>,
322 cx: &mut Context<Self>,
323 ) -> Task<Result<Vec<ChannelMessage>>> {
324 let request = if message_ids.is_empty() {
325 None
326 } else {
327 Some(
328 self.client
329 .request(proto::GetChannelMessagesById { message_ids }),
330 )
331 };
332 cx.spawn(async move |this, cx| {
333 if let Some(request) = request {
334 let response = request.await?;
335 let this = this
336 .upgrade()
337 .ok_or_else(|| anyhow!("channel store dropped"))?;
338 let user_store = this.update(cx, |this, _| this.user_store.clone())?;
339 ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
340 } else {
341 Ok(Vec::new())
342 }
343 })
344 }
345
346 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
347 self.channel_states
348 .get(&channel_id)
349 .is_some_and(|state| state.has_channel_buffer_changed())
350 }
351
352 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
353 self.channel_states
354 .get(&channel_id)
355 .is_some_and(|state| state.has_new_messages())
356 }
357
358 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
359 if let Some(state) = self.channel_states.get_mut(&channel_id) {
360 state.latest_chat_message = message_id;
361 }
362 }
363
364 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
365 self.channel_states.get(&channel_id).and_then(|state| {
366 if let Some(last_message_id) = state.latest_chat_message {
367 if state
368 .last_acknowledged_message_id()
369 .is_some_and(|id| id < last_message_id)
370 {
371 return state.last_acknowledged_message_id();
372 }
373 }
374
375 None
376 })
377 }
378
379 pub fn acknowledge_message_id(
380 &mut self,
381 channel_id: ChannelId,
382 message_id: u64,
383 cx: &mut Context<Self>,
384 ) {
385 self.channel_states
386 .entry(channel_id)
387 .or_default()
388 .acknowledge_message_id(message_id);
389 cx.notify();
390 }
391
392 pub fn update_latest_message_id(
393 &mut self,
394 channel_id: ChannelId,
395 message_id: u64,
396 cx: &mut Context<Self>,
397 ) {
398 self.channel_states
399 .entry(channel_id)
400 .or_default()
401 .update_latest_message_id(message_id);
402 cx.notify();
403 }
404
405 pub fn acknowledge_notes_version(
406 &mut self,
407 channel_id: ChannelId,
408 epoch: u64,
409 version: &clock::Global,
410 cx: &mut Context<Self>,
411 ) {
412 self.channel_states
413 .entry(channel_id)
414 .or_default()
415 .acknowledge_notes_version(epoch, version);
416 cx.notify()
417 }
418
419 pub fn update_latest_notes_version(
420 &mut self,
421 channel_id: ChannelId,
422 epoch: u64,
423 version: &clock::Global,
424 cx: &mut Context<Self>,
425 ) {
426 self.channel_states
427 .entry(channel_id)
428 .or_default()
429 .update_latest_notes_version(epoch, version);
430 cx.notify()
431 }
432
433 pub fn open_channel_chat(
434 &mut self,
435 channel_id: ChannelId,
436 cx: &mut Context<Self>,
437 ) -> Task<Result<Entity<ChannelChat>>> {
438 let client = self.client.clone();
439 let user_store = self.user_store.clone();
440 let this = cx.entity();
441 self.open_channel_resource(
442 channel_id,
443 |this| &mut this.opened_chats,
444 async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
445 cx,
446 )
447 }
448
449 /// Asynchronously open a given resource associated with a channel.
450 ///
451 /// Make sure that the resource is only opened once, even if this method
452 /// is called multiple times with the same channel id while the first task
453 /// is still running.
454 fn open_channel_resource<T, F>(
455 &mut self,
456 channel_id: ChannelId,
457 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
458 load: F,
459 cx: &mut Context<Self>,
460 ) -> Task<Result<Entity<T>>>
461 where
462 F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
463 T: 'static,
464 {
465 let task = loop {
466 match get_map(self).entry(channel_id) {
467 hash_map::Entry::Occupied(e) => match e.get() {
468 OpenEntityHandle::Open(entity) => {
469 if let Some(entity) = entity.upgrade() {
470 break Task::ready(Ok(entity)).shared();
471 } else {
472 get_map(self).remove(&channel_id);
473 continue;
474 }
475 }
476 OpenEntityHandle::Loading(task) => {
477 break task.clone();
478 }
479 },
480 hash_map::Entry::Vacant(e) => {
481 let task = cx
482 .spawn(async move |this, cx| {
483 let channel = this.update(cx, |this, _| {
484 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
485 Arc::new(anyhow!("no channel for id: {}", channel_id))
486 })
487 })??;
488
489 load(channel, cx).await.map_err(Arc::new)
490 })
491 .shared();
492
493 e.insert(OpenEntityHandle::Loading(task.clone()));
494 cx.spawn({
495 let task = task.clone();
496 async move |this, cx| {
497 let result = task.await;
498 this.update(cx, |this, _| match result {
499 Ok(model) => {
500 get_map(this).insert(
501 channel_id,
502 OpenEntityHandle::Open(model.downgrade()),
503 );
504 }
505 Err(_) => {
506 get_map(this).remove(&channel_id);
507 }
508 })
509 .ok();
510 }
511 })
512 .detach();
513 break task;
514 }
515 }
516 };
517 cx.background_spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
518 }
519
520 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
521 self.channel_role(channel_id) == proto::ChannelRole::Admin
522 }
523
524 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
525 self.channel_index
526 .by_id()
527 .get(&channel_id)
528 .map_or(false, |channel| channel.is_root_channel())
529 }
530
531 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
532 self.channel_index
533 .by_id()
534 .get(&channel_id)
535 .map_or(false, |channel| {
536 channel.visibility == ChannelVisibility::Public
537 })
538 }
539
540 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
541 match self.channel_role(channel_id) {
542 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
543 _ => Capability::ReadOnly,
544 }
545 }
546
547 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
548 maybe!({
549 let mut channel = self.channel_for_id(channel_id)?;
550 if !channel.is_root_channel() {
551 channel = self.channel_for_id(channel.root_id())?;
552 }
553 let root_channel_state = self.channel_states.get(&channel.id);
554 root_channel_state?.role
555 })
556 .unwrap_or(proto::ChannelRole::Guest)
557 }
558
559 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
560 self.channel_participants
561 .get(&channel_id)
562 .map_or(&[], |v| v.as_slice())
563 }
564
565 pub fn create_channel(
566 &self,
567 name: &str,
568 parent_id: Option<ChannelId>,
569 cx: &mut Context<Self>,
570 ) -> Task<Result<ChannelId>> {
571 let client = self.client.clone();
572 let name = name.trim_start_matches('#').to_owned();
573 cx.spawn(async move |this, cx| {
574 let response = client
575 .request(proto::CreateChannel {
576 name,
577 parent_id: parent_id.map(|cid| cid.0),
578 })
579 .await?;
580
581 let channel = response
582 .channel
583 .ok_or_else(|| anyhow!("missing channel in response"))?;
584 let channel_id = ChannelId(channel.id);
585
586 this.update(cx, |this, cx| {
587 let task = this.update_channels(
588 proto::UpdateChannels {
589 channels: vec![channel],
590 ..Default::default()
591 },
592 cx,
593 );
594 assert!(task.is_none());
595
596 // This event is emitted because the collab panel wants to clear the pending edit state
597 // before this frame is rendered. But we can't guarantee that the collab panel's future
598 // will resolve before this flush_effects finishes. Synchronously emitting this event
599 // ensures that the collab panel will observe this creation before the frame completes
600 cx.emit(ChannelEvent::ChannelCreated(channel_id));
601 })?;
602
603 Ok(channel_id)
604 })
605 }
606
607 pub fn move_channel(
608 &mut self,
609 channel_id: ChannelId,
610 to: ChannelId,
611 cx: &mut Context<Self>,
612 ) -> Task<Result<()>> {
613 let client = self.client.clone();
614 cx.spawn(async move |_, _| {
615 let _ = client
616 .request(proto::MoveChannel {
617 channel_id: channel_id.0,
618 to: to.0,
619 })
620 .await?;
621
622 Ok(())
623 })
624 }
625
626 pub fn set_channel_visibility(
627 &mut self,
628 channel_id: ChannelId,
629 visibility: ChannelVisibility,
630 cx: &mut Context<Self>,
631 ) -> Task<Result<()>> {
632 let client = self.client.clone();
633 cx.spawn(async move |_, _| {
634 let _ = client
635 .request(proto::SetChannelVisibility {
636 channel_id: channel_id.0,
637 visibility: visibility.into(),
638 })
639 .await?;
640
641 Ok(())
642 })
643 }
644
645 pub fn invite_member(
646 &mut self,
647 channel_id: ChannelId,
648 user_id: UserId,
649 role: proto::ChannelRole,
650 cx: &mut Context<Self>,
651 ) -> Task<Result<()>> {
652 if !self.outgoing_invites.insert((channel_id, user_id)) {
653 return Task::ready(Err(anyhow!("invite request already in progress")));
654 }
655
656 cx.notify();
657 let client = self.client.clone();
658 cx.spawn(async move |this, cx| {
659 let result = client
660 .request(proto::InviteChannelMember {
661 channel_id: channel_id.0,
662 user_id,
663 role: role.into(),
664 })
665 .await;
666
667 this.update(cx, |this, cx| {
668 this.outgoing_invites.remove(&(channel_id, user_id));
669 cx.notify();
670 })?;
671
672 result?;
673
674 Ok(())
675 })
676 }
677
678 pub fn remove_member(
679 &mut self,
680 channel_id: ChannelId,
681 user_id: u64,
682 cx: &mut Context<Self>,
683 ) -> Task<Result<()>> {
684 if !self.outgoing_invites.insert((channel_id, user_id)) {
685 return Task::ready(Err(anyhow!("invite request already in progress")));
686 }
687
688 cx.notify();
689 let client = self.client.clone();
690 cx.spawn(async move |this, cx| {
691 let result = client
692 .request(proto::RemoveChannelMember {
693 channel_id: channel_id.0,
694 user_id,
695 })
696 .await;
697
698 this.update(cx, |this, cx| {
699 this.outgoing_invites.remove(&(channel_id, user_id));
700 cx.notify();
701 })?;
702 result?;
703 Ok(())
704 })
705 }
706
707 pub fn set_member_role(
708 &mut self,
709 channel_id: ChannelId,
710 user_id: UserId,
711 role: proto::ChannelRole,
712 cx: &mut Context<Self>,
713 ) -> Task<Result<()>> {
714 if !self.outgoing_invites.insert((channel_id, user_id)) {
715 return Task::ready(Err(anyhow!("member request already in progress")));
716 }
717
718 cx.notify();
719 let client = self.client.clone();
720 cx.spawn(async move |this, cx| {
721 let result = client
722 .request(proto::SetChannelMemberRole {
723 channel_id: channel_id.0,
724 user_id,
725 role: role.into(),
726 })
727 .await;
728
729 this.update(cx, |this, cx| {
730 this.outgoing_invites.remove(&(channel_id, user_id));
731 cx.notify();
732 })?;
733
734 result?;
735 Ok(())
736 })
737 }
738
739 pub fn rename(
740 &mut self,
741 channel_id: ChannelId,
742 new_name: &str,
743 cx: &mut Context<Self>,
744 ) -> Task<Result<()>> {
745 let client = self.client.clone();
746 let name = new_name.to_string();
747 cx.spawn(async move |this, cx| {
748 let channel = client
749 .request(proto::RenameChannel {
750 channel_id: channel_id.0,
751 name,
752 })
753 .await?
754 .channel
755 .ok_or_else(|| anyhow!("missing channel in response"))?;
756 this.update(cx, |this, cx| {
757 let task = this.update_channels(
758 proto::UpdateChannels {
759 channels: vec![channel],
760 ..Default::default()
761 },
762 cx,
763 );
764 assert!(task.is_none());
765
766 // This event is emitted because the collab panel wants to clear the pending edit state
767 // before this frame is rendered. But we can't guarantee that the collab panel's future
768 // will resolve before this flush_effects finishes. Synchronously emitting this event
769 // ensures that the collab panel will observe this creation before the frame complete
770 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
771 })?;
772 Ok(())
773 })
774 }
775
776 pub fn respond_to_channel_invite(
777 &mut self,
778 channel_id: ChannelId,
779 accept: bool,
780 cx: &mut Context<Self>,
781 ) -> Task<Result<()>> {
782 let client = self.client.clone();
783 cx.background_spawn(async move {
784 client
785 .request(proto::RespondToChannelInvite {
786 channel_id: channel_id.0,
787 accept,
788 })
789 .await?;
790 Ok(())
791 })
792 }
793 pub fn fuzzy_search_members(
794 &self,
795 channel_id: ChannelId,
796 query: String,
797 limit: u16,
798 cx: &mut Context<Self>,
799 ) -> Task<Result<Vec<ChannelMembership>>> {
800 let client = self.client.clone();
801 let user_store = self.user_store.downgrade();
802 cx.spawn(async move |_, cx| {
803 let response = client
804 .request(proto::GetChannelMembers {
805 channel_id: channel_id.0,
806 query,
807 limit: limit as u64,
808 })
809 .await?;
810 user_store.update(cx, |user_store, _| {
811 user_store.insert(response.users);
812 response
813 .members
814 .into_iter()
815 .filter_map(|member| {
816 Some(ChannelMembership {
817 user: user_store.get_cached_user(member.user_id)?,
818 role: member.role(),
819 kind: member.kind(),
820 })
821 })
822 .collect()
823 })
824 })
825 }
826
827 pub fn remove_channel(
828 &self,
829 channel_id: ChannelId,
830 ) -> impl Future<Output = Result<()>> + use<> {
831 let client = self.client.clone();
832 async move {
833 client
834 .request(proto::DeleteChannel {
835 channel_id: channel_id.0,
836 })
837 .await?;
838 Ok(())
839 }
840 }
841
842 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
843 false
844 }
845
846 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
847 self.outgoing_invites.contains(&(channel_id, user_id))
848 }
849
850 async fn handle_update_channels(
851 this: Entity<Self>,
852 message: TypedEnvelope<proto::UpdateChannels>,
853 mut cx: AsyncApp,
854 ) -> Result<()> {
855 this.update(&mut cx, |this, _| {
856 this.update_channels_tx
857 .unbounded_send(message.payload)
858 .unwrap();
859 })?;
860 Ok(())
861 }
862
863 async fn handle_update_user_channels(
864 this: Entity<Self>,
865 message: TypedEnvelope<proto::UpdateUserChannels>,
866 mut cx: AsyncApp,
867 ) -> Result<()> {
868 this.update(&mut cx, |this, cx| {
869 for buffer_version in message.payload.observed_channel_buffer_version {
870 let version = language::proto::deserialize_version(&buffer_version.version);
871 this.acknowledge_notes_version(
872 ChannelId(buffer_version.channel_id),
873 buffer_version.epoch,
874 &version,
875 cx,
876 );
877 }
878 for message_id in message.payload.observed_channel_message_id {
879 this.acknowledge_message_id(
880 ChannelId(message_id.channel_id),
881 message_id.message_id,
882 cx,
883 );
884 }
885 for membership in message.payload.channel_memberships {
886 if let Some(role) = ChannelRole::from_i32(membership.role) {
887 this.channel_states
888 .entry(ChannelId(membership.channel_id))
889 .or_default()
890 .set_role(role)
891 }
892 }
893 })
894 }
895
896 fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
897 self.channel_index.clear();
898 self.channel_invitations.clear();
899 self.channel_participants.clear();
900 self.channel_index.clear();
901 self.outgoing_invites.clear();
902 self.disconnect_channel_buffers_task.take();
903
904 for chat in self.opened_chats.values() {
905 if let OpenEntityHandle::Open(chat) = chat {
906 if let Some(chat) = chat.upgrade() {
907 chat.update(cx, |chat, cx| {
908 chat.rejoin(cx);
909 });
910 }
911 }
912 }
913
914 let mut buffer_versions = Vec::new();
915 for buffer in self.opened_buffers.values() {
916 if let OpenEntityHandle::Open(buffer) = buffer {
917 if let Some(buffer) = buffer.upgrade() {
918 let channel_buffer = buffer.read(cx);
919 let buffer = channel_buffer.buffer().read(cx);
920 buffer_versions.push(proto::ChannelBufferVersion {
921 channel_id: channel_buffer.channel_id.0,
922 epoch: channel_buffer.epoch(),
923 version: language::proto::serialize_version(&buffer.version()),
924 });
925 }
926 }
927 }
928
929 if buffer_versions.is_empty() {
930 return Task::ready(Ok(()));
931 }
932
933 let response = self.client.request(proto::RejoinChannelBuffers {
934 buffers: buffer_versions,
935 });
936
937 cx.spawn(async move |this, cx| {
938 let mut response = response.await?;
939
940 this.update(cx, |this, cx| {
941 this.opened_buffers.retain(|_, buffer| match buffer {
942 OpenEntityHandle::Open(channel_buffer) => {
943 let Some(channel_buffer) = channel_buffer.upgrade() else {
944 return false;
945 };
946
947 channel_buffer.update(cx, |channel_buffer, cx| {
948 let channel_id = channel_buffer.channel_id;
949 if let Some(remote_buffer) = response
950 .buffers
951 .iter_mut()
952 .find(|buffer| buffer.channel_id == channel_id.0)
953 {
954 let channel_id = channel_buffer.channel_id;
955 let remote_version =
956 language::proto::deserialize_version(&remote_buffer.version);
957
958 channel_buffer.replace_collaborators(
959 mem::take(&mut remote_buffer.collaborators),
960 cx,
961 );
962
963 let operations = channel_buffer
964 .buffer()
965 .update(cx, |buffer, cx| {
966 let outgoing_operations =
967 buffer.serialize_ops(Some(remote_version), cx);
968 let incoming_operations =
969 mem::take(&mut remote_buffer.operations)
970 .into_iter()
971 .map(language::proto::deserialize_operation)
972 .collect::<Result<Vec<_>>>()?;
973 buffer.apply_ops(incoming_operations, cx);
974 anyhow::Ok(outgoing_operations)
975 })
976 .log_err();
977
978 if let Some(operations) = operations {
979 let client = this.client.clone();
980 cx.background_spawn(async move {
981 let operations = operations.await;
982 for chunk in language::proto::split_operations(operations) {
983 client
984 .send(proto::UpdateChannelBuffer {
985 channel_id: channel_id.0,
986 operations: chunk,
987 })
988 .ok();
989 }
990 })
991 .detach();
992 return true;
993 }
994 }
995
996 channel_buffer.disconnect(cx);
997 false
998 })
999 }
1000 OpenEntityHandle::Loading(_) => true,
1001 });
1002 })
1003 .ok();
1004 anyhow::Ok(())
1005 })
1006 }
1007
1008 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1009 cx.notify();
1010 self.did_subscribe = false;
1011 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1012 cx.spawn(async move |this, cx| {
1013 if wait_for_reconnect {
1014 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1015 }
1016
1017 if let Some(this) = this.upgrade() {
1018 this.update(cx, |this, cx| {
1019 for (_, buffer) in this.opened_buffers.drain() {
1020 if let OpenEntityHandle::Open(buffer) = buffer {
1021 if let Some(buffer) = buffer.upgrade() {
1022 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1023 }
1024 }
1025 }
1026 })
1027 .ok();
1028 }
1029 })
1030 });
1031 }
1032
1033 pub(crate) fn update_channels(
1034 &mut self,
1035 payload: proto::UpdateChannels,
1036 cx: &mut Context<ChannelStore>,
1037 ) -> Option<Task<Result<()>>> {
1038 if !payload.remove_channel_invitations.is_empty() {
1039 self.channel_invitations
1040 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1041 }
1042 for channel in payload.channel_invitations {
1043 match self
1044 .channel_invitations
1045 .binary_search_by_key(&channel.id, |c| c.id.0)
1046 {
1047 Ok(ix) => {
1048 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1049 }
1050 Err(ix) => self.channel_invitations.insert(
1051 ix,
1052 Arc::new(Channel {
1053 id: ChannelId(channel.id),
1054 visibility: channel.visibility(),
1055 name: channel.name.into(),
1056 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1057 }),
1058 ),
1059 }
1060 }
1061
1062 let channels_changed = !payload.channels.is_empty()
1063 || !payload.delete_channels.is_empty()
1064 || !payload.latest_channel_message_ids.is_empty()
1065 || !payload.latest_channel_buffer_versions.is_empty();
1066
1067 if channels_changed {
1068 if !payload.delete_channels.is_empty() {
1069 let delete_channels: Vec<ChannelId> =
1070 payload.delete_channels.into_iter().map(ChannelId).collect();
1071 self.channel_index.delete_channels(&delete_channels);
1072 self.channel_participants
1073 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1074
1075 for channel_id in &delete_channels {
1076 let channel_id = *channel_id;
1077 if payload
1078 .channels
1079 .iter()
1080 .any(|channel| channel.id == channel_id.0)
1081 {
1082 continue;
1083 }
1084 if let Some(OpenEntityHandle::Open(buffer)) =
1085 self.opened_buffers.remove(&channel_id)
1086 {
1087 if let Some(buffer) = buffer.upgrade() {
1088 buffer.update(cx, ChannelBuffer::disconnect);
1089 }
1090 }
1091 }
1092 }
1093
1094 let mut index = self.channel_index.bulk_insert();
1095 for channel in payload.channels {
1096 let id = ChannelId(channel.id);
1097 let channel_changed = index.insert(channel);
1098
1099 if channel_changed {
1100 if let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1101 if let Some(buffer) = buffer.upgrade() {
1102 buffer.update(cx, ChannelBuffer::channel_changed);
1103 }
1104 }
1105 }
1106 }
1107
1108 for latest_buffer_version in payload.latest_channel_buffer_versions {
1109 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1110 self.channel_states
1111 .entry(ChannelId(latest_buffer_version.channel_id))
1112 .or_default()
1113 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1114 }
1115
1116 for latest_channel_message in payload.latest_channel_message_ids {
1117 self.channel_states
1118 .entry(ChannelId(latest_channel_message.channel_id))
1119 .or_default()
1120 .update_latest_message_id(latest_channel_message.message_id);
1121 }
1122 }
1123
1124 cx.notify();
1125 if payload.channel_participants.is_empty() {
1126 return None;
1127 }
1128
1129 let mut all_user_ids = Vec::new();
1130 let channel_participants = payload.channel_participants;
1131 for entry in &channel_participants {
1132 for user_id in entry.participant_user_ids.iter() {
1133 if let Err(ix) = all_user_ids.binary_search(user_id) {
1134 all_user_ids.insert(ix, *user_id);
1135 }
1136 }
1137 }
1138
1139 let users = self
1140 .user_store
1141 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1142 Some(cx.spawn(async move |this, cx| {
1143 let users = users.await?;
1144
1145 this.update(cx, |this, cx| {
1146 for entry in &channel_participants {
1147 let mut participants: Vec<_> = entry
1148 .participant_user_ids
1149 .iter()
1150 .filter_map(|user_id| {
1151 users
1152 .binary_search_by_key(&user_id, |user| &user.id)
1153 .ok()
1154 .map(|ix| users[ix].clone())
1155 })
1156 .collect();
1157
1158 participants.sort_by_key(|u| u.id);
1159
1160 this.channel_participants
1161 .insert(ChannelId(entry.channel_id), participants);
1162 }
1163
1164 cx.notify();
1165 })
1166 }))
1167 }
1168}
1169
1170impl ChannelState {
1171 fn set_role(&mut self, role: ChannelRole) {
1172 self.role = Some(role);
1173 }
1174
1175 fn has_channel_buffer_changed(&self) -> bool {
1176 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1177 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1178 && self
1179 .latest_notes_version
1180 .version
1181 .changed_since(&self.observed_notes_version.version))
1182 }
1183
1184 fn has_new_messages(&self) -> bool {
1185 let latest_message_id = self.latest_chat_message;
1186 let observed_message_id = self.observed_chat_message;
1187
1188 latest_message_id.is_some_and(|latest_message_id| {
1189 latest_message_id > observed_message_id.unwrap_or_default()
1190 })
1191 }
1192
1193 fn last_acknowledged_message_id(&self) -> Option<u64> {
1194 self.observed_chat_message
1195 }
1196
1197 fn acknowledge_message_id(&mut self, message_id: u64) {
1198 let observed = self.observed_chat_message.get_or_insert(message_id);
1199 *observed = (*observed).max(message_id);
1200 }
1201
1202 fn update_latest_message_id(&mut self, message_id: u64) {
1203 self.latest_chat_message =
1204 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1205 }
1206
1207 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1208 if self.observed_notes_version.epoch == epoch {
1209 self.observed_notes_version.version.join(version);
1210 } else {
1211 self.observed_notes_version = NotesVersion {
1212 epoch,
1213 version: version.clone(),
1214 };
1215 }
1216 }
1217
1218 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1219 if self.latest_notes_version.epoch == epoch {
1220 self.latest_notes_version.version.join(version);
1221 } else {
1222 self.latest_notes_version = NotesVersion {
1223 epoch,
1224 version: version.clone(),
1225 };
1226 }
1227 }
1228}