1mod channel_index;
2
3use crate::{ChannelMessage, channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
4use anyhow::{Context as _, Result, anyhow};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{HashMap, HashSet, hash_map};
8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
11 WeakEntity,
12};
13use language::Capability;
14use rpc::{
15 TypedEnvelope,
16 proto::{self, ChannelRole, ChannelVisibility},
17};
18use settings::Settings;
19use std::{mem, sync::Arc, time::Duration};
20use util::{ResultExt, maybe};
21
22pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
23
24pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
25 let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
26 cx.set_global(GlobalChannelStore(channel_store));
27}
28
29#[derive(Debug, Clone, Default, PartialEq)]
30struct NotesVersion {
31 epoch: u64,
32 version: clock::Global,
33}
34
35pub struct ChannelStore {
36 pub channel_index: ChannelIndex,
37 channel_invitations: Vec<Arc<Channel>>,
38 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
39 channel_states: HashMap<ChannelId, ChannelState>,
40 outgoing_invites: HashSet<(ChannelId, UserId)>,
41 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
42 opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
43 opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
44 client: Arc<Client>,
45 did_subscribe: bool,
46 user_store: Entity<UserStore>,
47 _rpc_subscriptions: [Subscription; 2],
48 _watch_connection_status: Task<Option<()>>,
49 disconnect_channel_buffers_task: Option<Task<()>>,
50 _update_channels: Task<()>,
51}
52
53#[derive(Clone, Debug)]
54pub struct Channel {
55 pub id: ChannelId,
56 pub name: SharedString,
57 pub visibility: proto::ChannelVisibility,
58 pub parent_path: Vec<ChannelId>,
59}
60
61#[derive(Default, Debug)]
62pub struct ChannelState {
63 latest_chat_message: Option<u64>,
64 latest_notes_version: NotesVersion,
65 observed_notes_version: NotesVersion,
66 observed_chat_message: Option<u64>,
67 role: Option<ChannelRole>,
68}
69
70impl Channel {
71 pub fn link(&self, cx: &App) -> String {
72 format!(
73 "{}/channel/{}-{}",
74 ClientSettings::get_global(cx).server_url,
75 Self::slug(&self.name),
76 self.id
77 )
78 }
79
80 pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
81 self.link(cx)
82 + "/notes"
83 + &heading
84 .map(|h| format!("#{}", Self::slug(&h)))
85 .unwrap_or_default()
86 }
87
88 pub fn is_root_channel(&self) -> bool {
89 self.parent_path.is_empty()
90 }
91
92 pub fn root_id(&self) -> ChannelId {
93 self.parent_path.first().copied().unwrap_or(self.id)
94 }
95
96 pub fn slug(str: &str) -> String {
97 let slug: String = str
98 .chars()
99 .map(|c| if c.is_alphanumeric() { c } else { '-' })
100 .collect();
101
102 slug.trim_matches(|c| c == '-').to_string()
103 }
104}
105
106#[derive(Debug)]
107pub struct ChannelMembership {
108 pub user: Arc<User>,
109 pub kind: proto::channel_member::Kind,
110 pub role: proto::ChannelRole,
111}
112impl ChannelMembership {
113 pub fn sort_key(&self) -> MembershipSortKey {
114 MembershipSortKey {
115 role_order: match self.role {
116 proto::ChannelRole::Admin => 0,
117 proto::ChannelRole::Member => 1,
118 proto::ChannelRole::Banned => 2,
119 proto::ChannelRole::Talker => 3,
120 proto::ChannelRole::Guest => 4,
121 },
122 kind_order: match self.kind {
123 proto::channel_member::Kind::Member => 0,
124 proto::channel_member::Kind::Invitee => 1,
125 },
126 username_order: self.user.github_login.as_str(),
127 }
128 }
129}
130
131#[derive(PartialOrd, Ord, PartialEq, Eq)]
132pub struct MembershipSortKey<'a> {
133 role_order: u8,
134 kind_order: u8,
135 username_order: &'a str,
136}
137
138pub enum ChannelEvent {
139 ChannelCreated(ChannelId),
140 ChannelRenamed(ChannelId),
141}
142
143impl EventEmitter<ChannelEvent> for ChannelStore {}
144
145enum OpenEntityHandle<E> {
146 Open(WeakEntity<E>),
147 Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
148}
149
150struct GlobalChannelStore(Entity<ChannelStore>);
151
152impl Global for GlobalChannelStore {}
153
154impl ChannelStore {
155 pub fn global(cx: &App) -> Entity<Self> {
156 cx.global::<GlobalChannelStore>().0.clone()
157 }
158
159 pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
160 let rpc_subscriptions = [
161 client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
162 client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
163 ];
164
165 let mut connection_status = client.status();
166 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
167 let watch_connection_status = cx.spawn(async move |this, cx| {
168 while let Some(status) = connection_status.next().await {
169 let this = this.upgrade()?;
170 match status {
171 client::Status::Connected { .. } => {
172 this.update(cx, |this, cx| this.handle_connect(cx))
173 .ok()?
174 .await
175 .log_err()?;
176 }
177 client::Status::SignedOut | client::Status::UpgradeRequired => {
178 this.update(cx, |this, cx| this.handle_disconnect(false, cx))
179 .ok();
180 }
181 _ => {
182 this.update(cx, |this, cx| this.handle_disconnect(true, cx))
183 .ok();
184 }
185 }
186 }
187 Some(())
188 });
189
190 Self {
191 channel_invitations: Vec::default(),
192 channel_index: ChannelIndex::default(),
193 channel_participants: Default::default(),
194 outgoing_invites: Default::default(),
195 opened_buffers: Default::default(),
196 opened_chats: Default::default(),
197 update_channels_tx,
198 client,
199 user_store,
200 _rpc_subscriptions: rpc_subscriptions,
201 _watch_connection_status: watch_connection_status,
202 disconnect_channel_buffers_task: None,
203 _update_channels: cx.spawn(async move |this, cx| {
204 maybe!(async move {
205 while let Some(update_channels) = update_channels_rx.next().await {
206 if let Some(this) = this.upgrade() {
207 let update_task = this
208 .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
209 if let Some(update_task) = update_task {
210 update_task.await.log_err();
211 }
212 }
213 }
214 anyhow::Ok(())
215 })
216 .await
217 .log_err();
218 }),
219 channel_states: Default::default(),
220 did_subscribe: false,
221 }
222 }
223
224 pub fn initialize(&mut self) {
225 if !self.did_subscribe
226 && self
227 .client
228 .send(proto::SubscribeToChannels {})
229 .log_err()
230 .is_some()
231 {
232 self.did_subscribe = true;
233 }
234 }
235
236 pub fn client(&self) -> Arc<Client> {
237 self.client.clone()
238 }
239
240 /// Returns the number of unique channels in the store
241 pub fn channel_count(&self) -> usize {
242 self.channel_index.by_id().len()
243 }
244
245 /// Returns the index of a channel ID in the list of unique channels
246 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
247 self.channel_index
248 .by_id()
249 .keys()
250 .position(|id| *id == channel_id)
251 }
252
253 /// Returns an iterator over all unique channels
254 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
255 self.channel_index.by_id().values()
256 }
257
258 /// Iterate over all entries in the channel DAG
259 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
260 self.channel_index
261 .ordered_channels()
262 .iter()
263 .filter_map(move |id| {
264 let channel = self.channel_index.by_id().get(id)?;
265 Some((channel.parent_path.len(), channel))
266 })
267 }
268
269 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
270 let channel_id = self.channel_index.ordered_channels().get(ix)?;
271 self.channel_index.by_id().get(channel_id)
272 }
273
274 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
275 self.channel_index.by_id().values().nth(ix)
276 }
277
278 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
279 self.channel_invitations
280 .iter()
281 .any(|channel| channel.id == channel_id)
282 }
283
284 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
285 &self.channel_invitations
286 }
287
288 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
289 self.channel_index.by_id().get(&channel_id)
290 }
291
292 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
293 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
294 if let OpenEntityHandle::Open(buffer) = buffer {
295 return buffer.upgrade().is_some();
296 }
297 }
298 false
299 }
300
301 pub fn open_channel_buffer(
302 &mut self,
303 channel_id: ChannelId,
304 cx: &mut Context<Self>,
305 ) -> Task<Result<Entity<ChannelBuffer>>> {
306 let client = self.client.clone();
307 let user_store = self.user_store.clone();
308 let channel_store = cx.entity();
309 self.open_channel_resource(
310 channel_id,
311 |this| &mut this.opened_buffers,
312 async move |channel, cx| {
313 ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
314 },
315 cx,
316 )
317 }
318
319 pub fn fetch_channel_messages(
320 &self,
321 message_ids: Vec<u64>,
322 cx: &mut Context<Self>,
323 ) -> Task<Result<Vec<ChannelMessage>>> {
324 let request = if message_ids.is_empty() {
325 None
326 } else {
327 Some(
328 self.client
329 .request(proto::GetChannelMessagesById { message_ids }),
330 )
331 };
332 cx.spawn(async move |this, cx| {
333 if let Some(request) = request {
334 let response = request.await?;
335 let this = this.upgrade().context("channel store dropped")?;
336 let user_store = this.read_with(cx, |this, _| this.user_store.clone())?;
337 ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
338 } else {
339 Ok(Vec::new())
340 }
341 })
342 }
343
344 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
345 self.channel_states
346 .get(&channel_id)
347 .is_some_and(|state| state.has_channel_buffer_changed())
348 }
349
350 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
351 self.channel_states
352 .get(&channel_id)
353 .is_some_and(|state| state.has_new_messages())
354 }
355
356 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
357 if let Some(state) = self.channel_states.get_mut(&channel_id) {
358 state.latest_chat_message = message_id;
359 }
360 }
361
362 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
363 self.channel_states.get(&channel_id).and_then(|state| {
364 if let Some(last_message_id) = state.latest_chat_message {
365 if state
366 .last_acknowledged_message_id()
367 .is_some_and(|id| id < last_message_id)
368 {
369 return state.last_acknowledged_message_id();
370 }
371 }
372
373 None
374 })
375 }
376
377 pub fn acknowledge_message_id(
378 &mut self,
379 channel_id: ChannelId,
380 message_id: u64,
381 cx: &mut Context<Self>,
382 ) {
383 self.channel_states
384 .entry(channel_id)
385 .or_default()
386 .acknowledge_message_id(message_id);
387 cx.notify();
388 }
389
390 pub fn update_latest_message_id(
391 &mut self,
392 channel_id: ChannelId,
393 message_id: u64,
394 cx: &mut Context<Self>,
395 ) {
396 self.channel_states
397 .entry(channel_id)
398 .or_default()
399 .update_latest_message_id(message_id);
400 cx.notify();
401 }
402
403 pub fn acknowledge_notes_version(
404 &mut self,
405 channel_id: ChannelId,
406 epoch: u64,
407 version: &clock::Global,
408 cx: &mut Context<Self>,
409 ) {
410 self.channel_states
411 .entry(channel_id)
412 .or_default()
413 .acknowledge_notes_version(epoch, version);
414 cx.notify()
415 }
416
417 pub fn update_latest_notes_version(
418 &mut self,
419 channel_id: ChannelId,
420 epoch: u64,
421 version: &clock::Global,
422 cx: &mut Context<Self>,
423 ) {
424 self.channel_states
425 .entry(channel_id)
426 .or_default()
427 .update_latest_notes_version(epoch, version);
428 cx.notify()
429 }
430
431 pub fn open_channel_chat(
432 &mut self,
433 channel_id: ChannelId,
434 cx: &mut Context<Self>,
435 ) -> Task<Result<Entity<ChannelChat>>> {
436 let client = self.client.clone();
437 let user_store = self.user_store.clone();
438 let this = cx.entity();
439 self.open_channel_resource(
440 channel_id,
441 |this| &mut this.opened_chats,
442 async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
443 cx,
444 )
445 }
446
447 /// Asynchronously open a given resource associated with a channel.
448 ///
449 /// Make sure that the resource is only opened once, even if this method
450 /// is called multiple times with the same channel id while the first task
451 /// is still running.
452 fn open_channel_resource<T, F>(
453 &mut self,
454 channel_id: ChannelId,
455 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
456 load: F,
457 cx: &mut Context<Self>,
458 ) -> Task<Result<Entity<T>>>
459 where
460 F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
461 T: 'static,
462 {
463 let task = loop {
464 match get_map(self).entry(channel_id) {
465 hash_map::Entry::Occupied(e) => match e.get() {
466 OpenEntityHandle::Open(entity) => {
467 if let Some(entity) = entity.upgrade() {
468 break Task::ready(Ok(entity)).shared();
469 } else {
470 get_map(self).remove(&channel_id);
471 continue;
472 }
473 }
474 OpenEntityHandle::Loading(task) => {
475 break task.clone();
476 }
477 },
478 hash_map::Entry::Vacant(e) => {
479 let task = cx
480 .spawn(async move |this, cx| {
481 let channel = this.read_with(cx, |this, _| {
482 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
483 Arc::new(anyhow!("no channel for id: {channel_id}"))
484 })
485 })??;
486
487 load(channel, cx).await.map_err(Arc::new)
488 })
489 .shared();
490
491 e.insert(OpenEntityHandle::Loading(task.clone()));
492 cx.spawn({
493 let task = task.clone();
494 async move |this, cx| {
495 let result = task.await;
496 this.update(cx, |this, _| match result {
497 Ok(model) => {
498 get_map(this).insert(
499 channel_id,
500 OpenEntityHandle::Open(model.downgrade()),
501 );
502 }
503 Err(_) => {
504 get_map(this).remove(&channel_id);
505 }
506 })
507 .ok();
508 }
509 })
510 .detach();
511 break task;
512 }
513 }
514 };
515 cx.background_spawn(async move { task.await.map_err(|error| anyhow!("{error}")) })
516 }
517
518 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
519 self.channel_role(channel_id) == proto::ChannelRole::Admin
520 }
521
522 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
523 self.channel_index
524 .by_id()
525 .get(&channel_id)
526 .map_or(false, |channel| channel.is_root_channel())
527 }
528
529 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
530 self.channel_index
531 .by_id()
532 .get(&channel_id)
533 .map_or(false, |channel| {
534 channel.visibility == ChannelVisibility::Public
535 })
536 }
537
538 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
539 match self.channel_role(channel_id) {
540 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
541 _ => Capability::ReadOnly,
542 }
543 }
544
545 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
546 maybe!({
547 let mut channel = self.channel_for_id(channel_id)?;
548 if !channel.is_root_channel() {
549 channel = self.channel_for_id(channel.root_id())?;
550 }
551 let root_channel_state = self.channel_states.get(&channel.id);
552 root_channel_state?.role
553 })
554 .unwrap_or(proto::ChannelRole::Guest)
555 }
556
557 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
558 self.channel_participants
559 .get(&channel_id)
560 .map_or(&[], |v| v.as_slice())
561 }
562
563 pub fn create_channel(
564 &self,
565 name: &str,
566 parent_id: Option<ChannelId>,
567 cx: &mut Context<Self>,
568 ) -> Task<Result<ChannelId>> {
569 let client = self.client.clone();
570 let name = name.trim_start_matches('#').to_owned();
571 cx.spawn(async move |this, cx| {
572 let response = client
573 .request(proto::CreateChannel {
574 name,
575 parent_id: parent_id.map(|cid| cid.0),
576 })
577 .await?;
578
579 let channel = response.channel.context("missing channel in response")?;
580 let channel_id = ChannelId(channel.id);
581
582 this.update(cx, |this, cx| {
583 let task = this.update_channels(
584 proto::UpdateChannels {
585 channels: vec![channel],
586 ..Default::default()
587 },
588 cx,
589 );
590 assert!(task.is_none());
591
592 // This event is emitted because the collab panel wants to clear the pending edit state
593 // before this frame is rendered. But we can't guarantee that the collab panel's future
594 // will resolve before this flush_effects finishes. Synchronously emitting this event
595 // ensures that the collab panel will observe this creation before the frame completes
596 cx.emit(ChannelEvent::ChannelCreated(channel_id));
597 })?;
598
599 Ok(channel_id)
600 })
601 }
602
603 pub fn move_channel(
604 &mut self,
605 channel_id: ChannelId,
606 to: ChannelId,
607 cx: &mut Context<Self>,
608 ) -> Task<Result<()>> {
609 let client = self.client.clone();
610 cx.spawn(async move |_, _| {
611 let _ = client
612 .request(proto::MoveChannel {
613 channel_id: channel_id.0,
614 to: to.0,
615 })
616 .await?;
617
618 Ok(())
619 })
620 }
621
622 pub fn set_channel_visibility(
623 &mut self,
624 channel_id: ChannelId,
625 visibility: ChannelVisibility,
626 cx: &mut Context<Self>,
627 ) -> Task<Result<()>> {
628 let client = self.client.clone();
629 cx.spawn(async move |_, _| {
630 let _ = client
631 .request(proto::SetChannelVisibility {
632 channel_id: channel_id.0,
633 visibility: visibility.into(),
634 })
635 .await?;
636
637 Ok(())
638 })
639 }
640
641 pub fn invite_member(
642 &mut self,
643 channel_id: ChannelId,
644 user_id: UserId,
645 role: proto::ChannelRole,
646 cx: &mut Context<Self>,
647 ) -> Task<Result<()>> {
648 if !self.outgoing_invites.insert((channel_id, user_id)) {
649 return Task::ready(Err(anyhow!("invite request already in progress")));
650 }
651
652 cx.notify();
653 let client = self.client.clone();
654 cx.spawn(async move |this, cx| {
655 let result = client
656 .request(proto::InviteChannelMember {
657 channel_id: channel_id.0,
658 user_id,
659 role: role.into(),
660 })
661 .await;
662
663 this.update(cx, |this, cx| {
664 this.outgoing_invites.remove(&(channel_id, user_id));
665 cx.notify();
666 })?;
667
668 result?;
669
670 Ok(())
671 })
672 }
673
674 pub fn remove_member(
675 &mut self,
676 channel_id: ChannelId,
677 user_id: u64,
678 cx: &mut Context<Self>,
679 ) -> Task<Result<()>> {
680 if !self.outgoing_invites.insert((channel_id, user_id)) {
681 return Task::ready(Err(anyhow!("invite request already in progress")));
682 }
683
684 cx.notify();
685 let client = self.client.clone();
686 cx.spawn(async move |this, cx| {
687 let result = client
688 .request(proto::RemoveChannelMember {
689 channel_id: channel_id.0,
690 user_id,
691 })
692 .await;
693
694 this.update(cx, |this, cx| {
695 this.outgoing_invites.remove(&(channel_id, user_id));
696 cx.notify();
697 })?;
698 result?;
699 Ok(())
700 })
701 }
702
703 pub fn set_member_role(
704 &mut self,
705 channel_id: ChannelId,
706 user_id: UserId,
707 role: proto::ChannelRole,
708 cx: &mut Context<Self>,
709 ) -> Task<Result<()>> {
710 if !self.outgoing_invites.insert((channel_id, user_id)) {
711 return Task::ready(Err(anyhow!("member request already in progress")));
712 }
713
714 cx.notify();
715 let client = self.client.clone();
716 cx.spawn(async move |this, cx| {
717 let result = client
718 .request(proto::SetChannelMemberRole {
719 channel_id: channel_id.0,
720 user_id,
721 role: role.into(),
722 })
723 .await;
724
725 this.update(cx, |this, cx| {
726 this.outgoing_invites.remove(&(channel_id, user_id));
727 cx.notify();
728 })?;
729
730 result?;
731 Ok(())
732 })
733 }
734
735 pub fn rename(
736 &mut self,
737 channel_id: ChannelId,
738 new_name: &str,
739 cx: &mut Context<Self>,
740 ) -> Task<Result<()>> {
741 let client = self.client.clone();
742 let name = new_name.to_string();
743 cx.spawn(async move |this, cx| {
744 let channel = client
745 .request(proto::RenameChannel {
746 channel_id: channel_id.0,
747 name,
748 })
749 .await?
750 .channel
751 .context("missing channel in response")?;
752 this.update(cx, |this, cx| {
753 let task = this.update_channels(
754 proto::UpdateChannels {
755 channels: vec![channel],
756 ..Default::default()
757 },
758 cx,
759 );
760 assert!(task.is_none());
761
762 // This event is emitted because the collab panel wants to clear the pending edit state
763 // before this frame is rendered. But we can't guarantee that the collab panel's future
764 // will resolve before this flush_effects finishes. Synchronously emitting this event
765 // ensures that the collab panel will observe this creation before the frame complete
766 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
767 })?;
768 Ok(())
769 })
770 }
771
772 pub fn respond_to_channel_invite(
773 &mut self,
774 channel_id: ChannelId,
775 accept: bool,
776 cx: &mut Context<Self>,
777 ) -> Task<Result<()>> {
778 let client = self.client.clone();
779 cx.background_spawn(async move {
780 client
781 .request(proto::RespondToChannelInvite {
782 channel_id: channel_id.0,
783 accept,
784 })
785 .await?;
786 Ok(())
787 })
788 }
789 pub fn fuzzy_search_members(
790 &self,
791 channel_id: ChannelId,
792 query: String,
793 limit: u16,
794 cx: &mut Context<Self>,
795 ) -> Task<Result<Vec<ChannelMembership>>> {
796 let client = self.client.clone();
797 let user_store = self.user_store.downgrade();
798 cx.spawn(async move |_, cx| {
799 let response = client
800 .request(proto::GetChannelMembers {
801 channel_id: channel_id.0,
802 query,
803 limit: limit as u64,
804 })
805 .await?;
806 user_store.update(cx, |user_store, _| {
807 user_store.insert(response.users);
808 response
809 .members
810 .into_iter()
811 .filter_map(|member| {
812 Some(ChannelMembership {
813 user: user_store.get_cached_user(member.user_id)?,
814 role: member.role(),
815 kind: member.kind(),
816 })
817 })
818 .collect()
819 })
820 })
821 }
822
823 pub fn remove_channel(
824 &self,
825 channel_id: ChannelId,
826 ) -> impl Future<Output = Result<()>> + use<> {
827 let client = self.client.clone();
828 async move {
829 client
830 .request(proto::DeleteChannel {
831 channel_id: channel_id.0,
832 })
833 .await?;
834 Ok(())
835 }
836 }
837
838 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
839 false
840 }
841
842 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
843 self.outgoing_invites.contains(&(channel_id, user_id))
844 }
845
846 async fn handle_update_channels(
847 this: Entity<Self>,
848 message: TypedEnvelope<proto::UpdateChannels>,
849 mut cx: AsyncApp,
850 ) -> Result<()> {
851 this.read_with(&mut cx, |this, _| {
852 this.update_channels_tx
853 .unbounded_send(message.payload)
854 .unwrap();
855 })?;
856 Ok(())
857 }
858
859 async fn handle_update_user_channels(
860 this: Entity<Self>,
861 message: TypedEnvelope<proto::UpdateUserChannels>,
862 mut cx: AsyncApp,
863 ) -> Result<()> {
864 this.update(&mut cx, |this, cx| {
865 for buffer_version in message.payload.observed_channel_buffer_version {
866 let version = language::proto::deserialize_version(&buffer_version.version);
867 this.acknowledge_notes_version(
868 ChannelId(buffer_version.channel_id),
869 buffer_version.epoch,
870 &version,
871 cx,
872 );
873 }
874 for message_id in message.payload.observed_channel_message_id {
875 this.acknowledge_message_id(
876 ChannelId(message_id.channel_id),
877 message_id.message_id,
878 cx,
879 );
880 }
881 for membership in message.payload.channel_memberships {
882 if let Some(role) = ChannelRole::from_i32(membership.role) {
883 this.channel_states
884 .entry(ChannelId(membership.channel_id))
885 .or_default()
886 .set_role(role)
887 }
888 }
889 })
890 }
891
892 fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
893 self.channel_index.clear();
894 self.channel_invitations.clear();
895 self.channel_participants.clear();
896 self.channel_index.clear();
897 self.outgoing_invites.clear();
898 self.disconnect_channel_buffers_task.take();
899
900 for chat in self.opened_chats.values() {
901 if let OpenEntityHandle::Open(chat) = chat {
902 if let Some(chat) = chat.upgrade() {
903 chat.update(cx, |chat, cx| {
904 chat.rejoin(cx);
905 });
906 }
907 }
908 }
909
910 let mut buffer_versions = Vec::new();
911 for buffer in self.opened_buffers.values() {
912 if let OpenEntityHandle::Open(buffer) = buffer {
913 if let Some(buffer) = buffer.upgrade() {
914 let channel_buffer = buffer.read(cx);
915 let buffer = channel_buffer.buffer().read(cx);
916 buffer_versions.push(proto::ChannelBufferVersion {
917 channel_id: channel_buffer.channel_id.0,
918 epoch: channel_buffer.epoch(),
919 version: language::proto::serialize_version(&buffer.version()),
920 });
921 }
922 }
923 }
924
925 if buffer_versions.is_empty() {
926 return Task::ready(Ok(()));
927 }
928
929 let response = self.client.request(proto::RejoinChannelBuffers {
930 buffers: buffer_versions,
931 });
932
933 cx.spawn(async move |this, cx| {
934 let mut response = response.await?;
935
936 this.update(cx, |this, cx| {
937 this.opened_buffers.retain(|_, buffer| match buffer {
938 OpenEntityHandle::Open(channel_buffer) => {
939 let Some(channel_buffer) = channel_buffer.upgrade() else {
940 return false;
941 };
942
943 channel_buffer.update(cx, |channel_buffer, cx| {
944 let channel_id = channel_buffer.channel_id;
945 if let Some(remote_buffer) = response
946 .buffers
947 .iter_mut()
948 .find(|buffer| buffer.channel_id == channel_id.0)
949 {
950 let channel_id = channel_buffer.channel_id;
951 let remote_version =
952 language::proto::deserialize_version(&remote_buffer.version);
953
954 channel_buffer.replace_collaborators(
955 mem::take(&mut remote_buffer.collaborators),
956 cx,
957 );
958
959 let operations = channel_buffer
960 .buffer()
961 .update(cx, |buffer, cx| {
962 let outgoing_operations =
963 buffer.serialize_ops(Some(remote_version), cx);
964 let incoming_operations =
965 mem::take(&mut remote_buffer.operations)
966 .into_iter()
967 .map(language::proto::deserialize_operation)
968 .collect::<Result<Vec<_>>>()?;
969 buffer.apply_ops(incoming_operations, cx);
970 anyhow::Ok(outgoing_operations)
971 })
972 .log_err();
973
974 if let Some(operations) = operations {
975 let client = this.client.clone();
976 cx.background_spawn(async move {
977 let operations = operations.await;
978 for chunk in language::proto::split_operations(operations) {
979 client
980 .send(proto::UpdateChannelBuffer {
981 channel_id: channel_id.0,
982 operations: chunk,
983 })
984 .ok();
985 }
986 })
987 .detach();
988 return true;
989 }
990 }
991
992 channel_buffer.disconnect(cx);
993 false
994 })
995 }
996 OpenEntityHandle::Loading(_) => true,
997 });
998 })
999 .ok();
1000 anyhow::Ok(())
1001 })
1002 }
1003
1004 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1005 cx.notify();
1006 self.did_subscribe = false;
1007 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1008 cx.spawn(async move |this, cx| {
1009 if wait_for_reconnect {
1010 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1011 }
1012
1013 if let Some(this) = this.upgrade() {
1014 this.update(cx, |this, cx| {
1015 for (_, buffer) in this.opened_buffers.drain() {
1016 if let OpenEntityHandle::Open(buffer) = buffer {
1017 if let Some(buffer) = buffer.upgrade() {
1018 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1019 }
1020 }
1021 }
1022 })
1023 .ok();
1024 }
1025 })
1026 });
1027 }
1028
1029 pub(crate) fn update_channels(
1030 &mut self,
1031 payload: proto::UpdateChannels,
1032 cx: &mut Context<ChannelStore>,
1033 ) -> Option<Task<Result<()>>> {
1034 if !payload.remove_channel_invitations.is_empty() {
1035 self.channel_invitations
1036 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1037 }
1038 for channel in payload.channel_invitations {
1039 match self
1040 .channel_invitations
1041 .binary_search_by_key(&channel.id, |c| c.id.0)
1042 {
1043 Ok(ix) => {
1044 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1045 }
1046 Err(ix) => self.channel_invitations.insert(
1047 ix,
1048 Arc::new(Channel {
1049 id: ChannelId(channel.id),
1050 visibility: channel.visibility(),
1051 name: channel.name.into(),
1052 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1053 }),
1054 ),
1055 }
1056 }
1057
1058 let channels_changed = !payload.channels.is_empty()
1059 || !payload.delete_channels.is_empty()
1060 || !payload.latest_channel_message_ids.is_empty()
1061 || !payload.latest_channel_buffer_versions.is_empty();
1062
1063 if channels_changed {
1064 if !payload.delete_channels.is_empty() {
1065 let delete_channels: Vec<ChannelId> =
1066 payload.delete_channels.into_iter().map(ChannelId).collect();
1067 self.channel_index.delete_channels(&delete_channels);
1068 self.channel_participants
1069 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1070
1071 for channel_id in &delete_channels {
1072 let channel_id = *channel_id;
1073 if payload
1074 .channels
1075 .iter()
1076 .any(|channel| channel.id == channel_id.0)
1077 {
1078 continue;
1079 }
1080 if let Some(OpenEntityHandle::Open(buffer)) =
1081 self.opened_buffers.remove(&channel_id)
1082 {
1083 if let Some(buffer) = buffer.upgrade() {
1084 buffer.update(cx, ChannelBuffer::disconnect);
1085 }
1086 }
1087 }
1088 }
1089
1090 let mut index = self.channel_index.bulk_insert();
1091 for channel in payload.channels {
1092 let id = ChannelId(channel.id);
1093 let channel_changed = index.insert(channel);
1094
1095 if channel_changed {
1096 if let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1097 if let Some(buffer) = buffer.upgrade() {
1098 buffer.update(cx, ChannelBuffer::channel_changed);
1099 }
1100 }
1101 }
1102 }
1103
1104 for latest_buffer_version in payload.latest_channel_buffer_versions {
1105 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1106 self.channel_states
1107 .entry(ChannelId(latest_buffer_version.channel_id))
1108 .or_default()
1109 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1110 }
1111
1112 for latest_channel_message in payload.latest_channel_message_ids {
1113 self.channel_states
1114 .entry(ChannelId(latest_channel_message.channel_id))
1115 .or_default()
1116 .update_latest_message_id(latest_channel_message.message_id);
1117 }
1118 }
1119
1120 cx.notify();
1121 if payload.channel_participants.is_empty() {
1122 return None;
1123 }
1124
1125 let mut all_user_ids = Vec::new();
1126 let channel_participants = payload.channel_participants;
1127 for entry in &channel_participants {
1128 for user_id in entry.participant_user_ids.iter() {
1129 if let Err(ix) = all_user_ids.binary_search(user_id) {
1130 all_user_ids.insert(ix, *user_id);
1131 }
1132 }
1133 }
1134
1135 let users = self
1136 .user_store
1137 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1138 Some(cx.spawn(async move |this, cx| {
1139 let users = users.await?;
1140
1141 this.update(cx, |this, cx| {
1142 for entry in &channel_participants {
1143 let mut participants: Vec<_> = entry
1144 .participant_user_ids
1145 .iter()
1146 .filter_map(|user_id| {
1147 users
1148 .binary_search_by_key(&user_id, |user| &user.id)
1149 .ok()
1150 .map(|ix| users[ix].clone())
1151 })
1152 .collect();
1153
1154 participants.sort_by_key(|u| u.id);
1155
1156 this.channel_participants
1157 .insert(ChannelId(entry.channel_id), participants);
1158 }
1159
1160 cx.notify();
1161 })
1162 }))
1163 }
1164}
1165
1166impl ChannelState {
1167 fn set_role(&mut self, role: ChannelRole) {
1168 self.role = Some(role);
1169 }
1170
1171 fn has_channel_buffer_changed(&self) -> bool {
1172 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1173 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1174 && self
1175 .latest_notes_version
1176 .version
1177 .changed_since(&self.observed_notes_version.version))
1178 }
1179
1180 fn has_new_messages(&self) -> bool {
1181 let latest_message_id = self.latest_chat_message;
1182 let observed_message_id = self.observed_chat_message;
1183
1184 latest_message_id.is_some_and(|latest_message_id| {
1185 latest_message_id > observed_message_id.unwrap_or_default()
1186 })
1187 }
1188
1189 fn last_acknowledged_message_id(&self) -> Option<u64> {
1190 self.observed_chat_message
1191 }
1192
1193 fn acknowledge_message_id(&mut self, message_id: u64) {
1194 let observed = self.observed_chat_message.get_or_insert(message_id);
1195 *observed = (*observed).max(message_id);
1196 }
1197
1198 fn update_latest_message_id(&mut self, message_id: u64) {
1199 self.latest_chat_message =
1200 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1201 }
1202
1203 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1204 if self.observed_notes_version.epoch == epoch {
1205 self.observed_notes_version.version.join(version);
1206 } else {
1207 self.observed_notes_version = NotesVersion {
1208 epoch,
1209 version: version.clone(),
1210 };
1211 }
1212 }
1213
1214 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1215 if self.latest_notes_version.epoch == epoch {
1216 self.latest_notes_version.version.join(version);
1217 } else {
1218 self.latest_notes_version = NotesVersion {
1219 epoch,
1220 version: version.clone(),
1221 };
1222 }
1223 }
1224}