1mod channel_index;
2
3use crate::{ChannelMessage, channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
4use anyhow::{Context as _, Result, anyhow};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{HashMap, HashSet};
8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
11 WeakEntity,
12};
13use language::Capability;
14use postage::{sink::Sink, watch};
15use rpc::{
16 TypedEnvelope,
17 proto::{self, ChannelRole, ChannelVisibility},
18};
19use settings::Settings;
20use std::{mem, sync::Arc, time::Duration};
21use util::{ResultExt, maybe};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
26 let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36pub struct ChannelStore {
37 pub channel_index: ChannelIndex,
38 channel_invitations: Vec<Arc<Channel>>,
39 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
40 channel_states: HashMap<ChannelId, ChannelState>,
41 outgoing_invites: HashSet<(ChannelId, UserId)>,
42 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
43 opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
44 opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
45 client: Arc<Client>,
46 did_subscribe: bool,
47 channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
48 user_store: Entity<UserStore>,
49 _rpc_subscriptions: [Subscription; 2],
50 _watch_connection_status: Task<Option<()>>,
51 disconnect_channel_buffers_task: Option<Task<()>>,
52 _update_channels: Task<()>,
53}
54
55#[derive(Clone, Debug)]
56pub struct Channel {
57 pub id: ChannelId,
58 pub name: SharedString,
59 pub visibility: proto::ChannelVisibility,
60 pub parent_path: Vec<ChannelId>,
61 pub channel_order: i32,
62}
63
64#[derive(Default, Debug)]
65pub struct ChannelState {
66 latest_chat_message: Option<u64>,
67 latest_notes_version: NotesVersion,
68 observed_notes_version: NotesVersion,
69 observed_chat_message: Option<u64>,
70 role: Option<ChannelRole>,
71}
72
73impl Channel {
74 pub fn link(&self, cx: &App) -> String {
75 format!(
76 "{}/channel/{}-{}",
77 ClientSettings::get_global(cx).server_url,
78 Self::slug(&self.name),
79 self.id
80 )
81 }
82
83 pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
84 self.link(cx)
85 + "/notes"
86 + &heading
87 .map(|h| format!("#{}", Self::slug(&h)))
88 .unwrap_or_default()
89 }
90
91 pub fn is_root_channel(&self) -> bool {
92 self.parent_path.is_empty()
93 }
94
95 pub fn root_id(&self) -> ChannelId {
96 self.parent_path.first().copied().unwrap_or(self.id)
97 }
98
99 pub fn slug(str: &str) -> String {
100 let slug: String = str
101 .chars()
102 .map(|c| if c.is_alphanumeric() { c } else { '-' })
103 .collect();
104
105 slug.trim_matches(|c| c == '-').to_string()
106 }
107}
108
109#[derive(Debug)]
110pub struct ChannelMembership {
111 pub user: Arc<User>,
112 pub kind: proto::channel_member::Kind,
113 pub role: proto::ChannelRole,
114}
115impl ChannelMembership {
116 pub fn sort_key(&self) -> MembershipSortKey<'_> {
117 MembershipSortKey {
118 role_order: match self.role {
119 proto::ChannelRole::Admin => 0,
120 proto::ChannelRole::Member => 1,
121 proto::ChannelRole::Banned => 2,
122 proto::ChannelRole::Talker => 3,
123 proto::ChannelRole::Guest => 4,
124 },
125 kind_order: match self.kind {
126 proto::channel_member::Kind::Member => 0,
127 proto::channel_member::Kind::Invitee => 1,
128 },
129 username_order: &self.user.github_login,
130 }
131 }
132}
133
134#[derive(PartialOrd, Ord, PartialEq, Eq)]
135pub struct MembershipSortKey<'a> {
136 role_order: u8,
137 kind_order: u8,
138 username_order: &'a str,
139}
140
141pub enum ChannelEvent {
142 ChannelCreated(ChannelId),
143 ChannelRenamed(ChannelId),
144}
145
146impl EventEmitter<ChannelEvent> for ChannelStore {}
147
148enum OpenEntityHandle<E> {
149 Open(WeakEntity<E>),
150 Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
151}
152
153struct GlobalChannelStore(Entity<ChannelStore>);
154
155impl Global for GlobalChannelStore {}
156
157impl ChannelStore {
158 pub fn global(cx: &App) -> Entity<Self> {
159 cx.global::<GlobalChannelStore>().0.clone()
160 }
161
162 pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
163 let rpc_subscriptions = [
164 client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
165 client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
166 ];
167
168 let mut connection_status = client.status();
169 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
170 let watch_connection_status = cx.spawn(async move |this, cx| {
171 while let Some(status) = connection_status.next().await {
172 let this = this.upgrade()?;
173 match status {
174 client::Status::Connected { .. } => {
175 this.update(cx, |this, cx| this.handle_connect(cx))
176 .ok()?
177 .await
178 .log_err()?;
179 }
180 client::Status::SignedOut | client::Status::UpgradeRequired => {
181 this.update(cx, |this, cx| this.handle_disconnect(false, cx))
182 .ok();
183 }
184 _ => {
185 this.update(cx, |this, cx| this.handle_disconnect(true, cx))
186 .ok();
187 }
188 }
189 }
190 Some(())
191 });
192
193 Self {
194 channel_invitations: Vec::default(),
195 channel_index: ChannelIndex::default(),
196 channel_participants: Default::default(),
197 outgoing_invites: Default::default(),
198 opened_buffers: Default::default(),
199 opened_chats: Default::default(),
200 update_channels_tx,
201 client,
202 user_store,
203 _rpc_subscriptions: rpc_subscriptions,
204 _watch_connection_status: watch_connection_status,
205 disconnect_channel_buffers_task: None,
206 _update_channels: cx.spawn(async move |this, cx| {
207 maybe!(async move {
208 while let Some(update_channels) = update_channels_rx.next().await {
209 if let Some(this) = this.upgrade() {
210 let update_task = this
211 .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
212 if let Some(update_task) = update_task {
213 update_task.await.log_err();
214 }
215 }
216 }
217 anyhow::Ok(())
218 })
219 .await
220 .log_err();
221 }),
222 channel_states: Default::default(),
223 did_subscribe: false,
224 channels_loaded: watch::channel_with(false),
225 }
226 }
227
228 pub fn initialize(&mut self) {
229 if !self.did_subscribe
230 && self
231 .client
232 .send(proto::SubscribeToChannels {})
233 .log_err()
234 .is_some()
235 {
236 self.did_subscribe = true;
237 }
238 }
239
240 pub fn wait_for_channels(
241 &mut self,
242 timeout: Duration,
243 cx: &mut Context<Self>,
244 ) -> Task<Result<()>> {
245 let mut channels_loaded_rx = self.channels_loaded.1.clone();
246 if *channels_loaded_rx.borrow() {
247 return Task::ready(Ok(()));
248 }
249
250 let mut status_receiver = self.client.status();
251 if status_receiver.borrow().is_connected() {
252 self.initialize();
253 }
254
255 let mut timer = cx.background_executor().timer(timeout).fuse();
256 cx.spawn(async move |this, cx| {
257 loop {
258 futures::select_biased! {
259 channels_loaded = channels_loaded_rx.next().fuse() => {
260 if let Some(true) = channels_loaded {
261 return Ok(());
262 }
263 }
264 status = status_receiver.next().fuse() => {
265 if let Some(status) = status {
266 if status.is_connected() {
267 this.update(cx, |this, _cx| {
268 this.initialize();
269 }).ok();
270 }
271 }
272 continue;
273 }
274 _ = timer => {
275 return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
276 }
277 }
278 }
279 })
280 }
281
282 pub fn client(&self) -> Arc<Client> {
283 self.client.clone()
284 }
285
286 /// Returns the number of unique channels in the store
287 pub fn channel_count(&self) -> usize {
288 self.channel_index.by_id().len()
289 }
290
291 /// Returns the index of a channel ID in the list of unique channels
292 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
293 self.channel_index
294 .by_id()
295 .keys()
296 .position(|id| *id == channel_id)
297 }
298
299 /// Returns an iterator over all unique channels
300 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
301 self.channel_index.by_id().values()
302 }
303
304 /// Iterate over all entries in the channel DAG
305 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
306 self.channel_index
307 .ordered_channels()
308 .iter()
309 .filter_map(move |id| {
310 let channel = self.channel_index.by_id().get(id)?;
311 Some((channel.parent_path.len(), channel))
312 })
313 }
314
315 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
316 let channel_id = self.channel_index.ordered_channels().get(ix)?;
317 self.channel_index.by_id().get(channel_id)
318 }
319
320 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
321 self.channel_index.by_id().values().nth(ix)
322 }
323
324 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
325 self.channel_invitations
326 .iter()
327 .any(|channel| channel.id == channel_id)
328 }
329
330 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
331 &self.channel_invitations
332 }
333
334 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
335 self.channel_index.by_id().get(&channel_id)
336 }
337
338 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
339 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
340 if let OpenEntityHandle::Open(buffer) = buffer {
341 return buffer.upgrade().is_some();
342 }
343 }
344 false
345 }
346
347 pub fn open_channel_buffer(
348 &mut self,
349 channel_id: ChannelId,
350 cx: &mut Context<Self>,
351 ) -> Task<Result<Entity<ChannelBuffer>>> {
352 let client = self.client.clone();
353 let user_store = self.user_store.clone();
354 let channel_store = cx.entity();
355 self.open_channel_resource(
356 channel_id,
357 "notes",
358 |this| &mut this.opened_buffers,
359 async move |channel, cx| {
360 ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
361 },
362 cx,
363 )
364 }
365
366 pub fn fetch_channel_messages(
367 &self,
368 message_ids: Vec<u64>,
369 cx: &mut Context<Self>,
370 ) -> Task<Result<Vec<ChannelMessage>>> {
371 let request = if message_ids.is_empty() {
372 None
373 } else {
374 Some(
375 self.client
376 .request(proto::GetChannelMessagesById { message_ids }),
377 )
378 };
379 cx.spawn(async move |this, cx| {
380 if let Some(request) = request {
381 let response = request.await?;
382 let this = this.upgrade().context("channel store dropped")?;
383 let user_store = this.read_with(cx, |this, _| this.user_store.clone())?;
384 ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
385 } else {
386 Ok(Vec::new())
387 }
388 })
389 }
390
391 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
392 self.channel_states
393 .get(&channel_id)
394 .is_some_and(|state| state.has_channel_buffer_changed())
395 }
396
397 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
398 self.channel_states
399 .get(&channel_id)
400 .is_some_and(|state| state.has_new_messages())
401 }
402
403 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
404 if let Some(state) = self.channel_states.get_mut(&channel_id) {
405 state.latest_chat_message = message_id;
406 }
407 }
408
409 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
410 self.channel_states.get(&channel_id).and_then(|state| {
411 if let Some(last_message_id) = state.latest_chat_message {
412 if state
413 .last_acknowledged_message_id()
414 .is_some_and(|id| id < last_message_id)
415 {
416 return state.last_acknowledged_message_id();
417 }
418 }
419
420 None
421 })
422 }
423
424 pub fn acknowledge_message_id(
425 &mut self,
426 channel_id: ChannelId,
427 message_id: u64,
428 cx: &mut Context<Self>,
429 ) {
430 self.channel_states
431 .entry(channel_id)
432 .or_default()
433 .acknowledge_message_id(message_id);
434 cx.notify();
435 }
436
437 pub fn update_latest_message_id(
438 &mut self,
439 channel_id: ChannelId,
440 message_id: u64,
441 cx: &mut Context<Self>,
442 ) {
443 self.channel_states
444 .entry(channel_id)
445 .or_default()
446 .update_latest_message_id(message_id);
447 cx.notify();
448 }
449
450 pub fn acknowledge_notes_version(
451 &mut self,
452 channel_id: ChannelId,
453 epoch: u64,
454 version: &clock::Global,
455 cx: &mut Context<Self>,
456 ) {
457 self.channel_states
458 .entry(channel_id)
459 .or_default()
460 .acknowledge_notes_version(epoch, version);
461 cx.notify()
462 }
463
464 pub fn update_latest_notes_version(
465 &mut self,
466 channel_id: ChannelId,
467 epoch: u64,
468 version: &clock::Global,
469 cx: &mut Context<Self>,
470 ) {
471 self.channel_states
472 .entry(channel_id)
473 .or_default()
474 .update_latest_notes_version(epoch, version);
475 cx.notify()
476 }
477
478 pub fn open_channel_chat(
479 &mut self,
480 channel_id: ChannelId,
481 cx: &mut Context<Self>,
482 ) -> Task<Result<Entity<ChannelChat>>> {
483 let client = self.client.clone();
484 let user_store = self.user_store.clone();
485 let this = cx.entity();
486 self.open_channel_resource(
487 channel_id,
488 "chat",
489 |this| &mut this.opened_chats,
490 async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
491 cx,
492 )
493 }
494
495 /// Asynchronously open a given resource associated with a channel.
496 ///
497 /// Make sure that the resource is only opened once, even if this method
498 /// is called multiple times with the same channel id while the first task
499 /// is still running.
500 fn open_channel_resource<T, F>(
501 &mut self,
502 channel_id: ChannelId,
503 resource_name: &'static str,
504 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
505 load: F,
506 cx: &mut Context<Self>,
507 ) -> Task<Result<Entity<T>>>
508 where
509 F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
510 T: 'static,
511 {
512 let task = loop {
513 match get_map(self).get(&channel_id) {
514 Some(OpenEntityHandle::Open(entity)) => {
515 if let Some(entity) = entity.upgrade() {
516 break Task::ready(Ok(entity)).shared();
517 } else {
518 get_map(self).remove(&channel_id);
519 continue;
520 }
521 }
522 Some(OpenEntityHandle::Loading(task)) => break task.clone(),
523 None => {}
524 }
525
526 let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
527 let task = cx
528 .spawn(async move |this, cx| {
529 channels_ready.await?;
530 let channel = this.read_with(cx, |this, _| {
531 this.channel_for_id(channel_id)
532 .cloned()
533 .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
534 })??;
535
536 load(channel, cx).await.map_err(Arc::new)
537 })
538 .shared();
539
540 get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
541 let task = cx.spawn({
542 async move |this, cx| {
543 let result = task.await;
544 this.update(cx, |this, _| match &result {
545 Ok(model) => {
546 get_map(this)
547 .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
548 }
549 Err(_) => {
550 get_map(this).remove(&channel_id);
551 }
552 })?;
553 result
554 }
555 });
556 break task.shared();
557 };
558 cx.background_spawn(async move {
559 task.await.map_err(|error| {
560 anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
561 })
562 })
563 }
564
565 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
566 self.channel_role(channel_id) == proto::ChannelRole::Admin
567 }
568
569 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
570 self.channel_index
571 .by_id()
572 .get(&channel_id)
573 .map_or(false, |channel| channel.is_root_channel())
574 }
575
576 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
577 self.channel_index
578 .by_id()
579 .get(&channel_id)
580 .map_or(false, |channel| {
581 channel.visibility == ChannelVisibility::Public
582 })
583 }
584
585 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
586 match self.channel_role(channel_id) {
587 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
588 _ => Capability::ReadOnly,
589 }
590 }
591
592 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
593 maybe!({
594 let mut channel = self.channel_for_id(channel_id)?;
595 if !channel.is_root_channel() {
596 channel = self.channel_for_id(channel.root_id())?;
597 }
598 let root_channel_state = self.channel_states.get(&channel.id);
599 root_channel_state?.role
600 })
601 .unwrap_or(proto::ChannelRole::Guest)
602 }
603
604 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
605 self.channel_participants
606 .get(&channel_id)
607 .map_or(&[], |v| v.as_slice())
608 }
609
610 pub fn create_channel(
611 &self,
612 name: &str,
613 parent_id: Option<ChannelId>,
614 cx: &mut Context<Self>,
615 ) -> Task<Result<ChannelId>> {
616 let client = self.client.clone();
617 let name = name.trim_start_matches('#').to_owned();
618 cx.spawn(async move |this, cx| {
619 let response = client
620 .request(proto::CreateChannel {
621 name,
622 parent_id: parent_id.map(|cid| cid.0),
623 })
624 .await?;
625
626 let channel = response.channel.context("missing channel in response")?;
627 let channel_id = ChannelId(channel.id);
628
629 this.update(cx, |this, cx| {
630 let task = this.update_channels(
631 proto::UpdateChannels {
632 channels: vec![channel],
633 ..Default::default()
634 },
635 cx,
636 );
637 assert!(task.is_none());
638
639 // This event is emitted because the collab panel wants to clear the pending edit state
640 // before this frame is rendered. But we can't guarantee that the collab panel's future
641 // will resolve before this flush_effects finishes. Synchronously emitting this event
642 // ensures that the collab panel will observe this creation before the frame completes
643 cx.emit(ChannelEvent::ChannelCreated(channel_id));
644 })?;
645
646 Ok(channel_id)
647 })
648 }
649
650 pub fn move_channel(
651 &mut self,
652 channel_id: ChannelId,
653 to: ChannelId,
654 cx: &mut Context<Self>,
655 ) -> Task<Result<()>> {
656 let client = self.client.clone();
657 cx.spawn(async move |_, _| {
658 let _ = client
659 .request(proto::MoveChannel {
660 channel_id: channel_id.0,
661 to: to.0,
662 })
663 .await?;
664 Ok(())
665 })
666 }
667
668 pub fn reorder_channel(
669 &mut self,
670 channel_id: ChannelId,
671 direction: proto::reorder_channel::Direction,
672 cx: &mut Context<Self>,
673 ) -> Task<Result<()>> {
674 let client = self.client.clone();
675 cx.spawn(async move |_, _| {
676 client
677 .request(proto::ReorderChannel {
678 channel_id: channel_id.0,
679 direction: direction.into(),
680 })
681 .await?;
682 Ok(())
683 })
684 }
685
686 pub fn set_channel_visibility(
687 &mut self,
688 channel_id: ChannelId,
689 visibility: ChannelVisibility,
690 cx: &mut Context<Self>,
691 ) -> Task<Result<()>> {
692 let client = self.client.clone();
693 cx.spawn(async move |_, _| {
694 let _ = client
695 .request(proto::SetChannelVisibility {
696 channel_id: channel_id.0,
697 visibility: visibility.into(),
698 })
699 .await?;
700
701 Ok(())
702 })
703 }
704
705 pub fn invite_member(
706 &mut self,
707 channel_id: ChannelId,
708 user_id: UserId,
709 role: proto::ChannelRole,
710 cx: &mut Context<Self>,
711 ) -> Task<Result<()>> {
712 if !self.outgoing_invites.insert((channel_id, user_id)) {
713 return Task::ready(Err(anyhow!("invite request already in progress")));
714 }
715
716 cx.notify();
717 let client = self.client.clone();
718 cx.spawn(async move |this, cx| {
719 let result = client
720 .request(proto::InviteChannelMember {
721 channel_id: channel_id.0,
722 user_id,
723 role: role.into(),
724 })
725 .await;
726
727 this.update(cx, |this, cx| {
728 this.outgoing_invites.remove(&(channel_id, user_id));
729 cx.notify();
730 })?;
731
732 result?;
733
734 Ok(())
735 })
736 }
737
738 pub fn remove_member(
739 &mut self,
740 channel_id: ChannelId,
741 user_id: u64,
742 cx: &mut Context<Self>,
743 ) -> Task<Result<()>> {
744 if !self.outgoing_invites.insert((channel_id, user_id)) {
745 return Task::ready(Err(anyhow!("invite request already in progress")));
746 }
747
748 cx.notify();
749 let client = self.client.clone();
750 cx.spawn(async move |this, cx| {
751 let result = client
752 .request(proto::RemoveChannelMember {
753 channel_id: channel_id.0,
754 user_id,
755 })
756 .await;
757
758 this.update(cx, |this, cx| {
759 this.outgoing_invites.remove(&(channel_id, user_id));
760 cx.notify();
761 })?;
762 result?;
763 Ok(())
764 })
765 }
766
767 pub fn set_member_role(
768 &mut self,
769 channel_id: ChannelId,
770 user_id: UserId,
771 role: proto::ChannelRole,
772 cx: &mut Context<Self>,
773 ) -> Task<Result<()>> {
774 if !self.outgoing_invites.insert((channel_id, user_id)) {
775 return Task::ready(Err(anyhow!("member request already in progress")));
776 }
777
778 cx.notify();
779 let client = self.client.clone();
780 cx.spawn(async move |this, cx| {
781 let result = client
782 .request(proto::SetChannelMemberRole {
783 channel_id: channel_id.0,
784 user_id,
785 role: role.into(),
786 })
787 .await;
788
789 this.update(cx, |this, cx| {
790 this.outgoing_invites.remove(&(channel_id, user_id));
791 cx.notify();
792 })?;
793
794 result?;
795 Ok(())
796 })
797 }
798
799 pub fn rename(
800 &mut self,
801 channel_id: ChannelId,
802 new_name: &str,
803 cx: &mut Context<Self>,
804 ) -> Task<Result<()>> {
805 let client = self.client.clone();
806 let name = new_name.to_string();
807 cx.spawn(async move |this, cx| {
808 let channel = client
809 .request(proto::RenameChannel {
810 channel_id: channel_id.0,
811 name,
812 })
813 .await?
814 .channel
815 .context("missing channel in response")?;
816 this.update(cx, |this, cx| {
817 let task = this.update_channels(
818 proto::UpdateChannels {
819 channels: vec![channel],
820 ..Default::default()
821 },
822 cx,
823 );
824 assert!(task.is_none());
825
826 // This event is emitted because the collab panel wants to clear the pending edit state
827 // before this frame is rendered. But we can't guarantee that the collab panel's future
828 // will resolve before this flush_effects finishes. Synchronously emitting this event
829 // ensures that the collab panel will observe this creation before the frame complete
830 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
831 })?;
832 Ok(())
833 })
834 }
835
836 pub fn respond_to_channel_invite(
837 &mut self,
838 channel_id: ChannelId,
839 accept: bool,
840 cx: &mut Context<Self>,
841 ) -> Task<Result<()>> {
842 let client = self.client.clone();
843 cx.background_spawn(async move {
844 client
845 .request(proto::RespondToChannelInvite {
846 channel_id: channel_id.0,
847 accept,
848 })
849 .await?;
850 Ok(())
851 })
852 }
853 pub fn fuzzy_search_members(
854 &self,
855 channel_id: ChannelId,
856 query: String,
857 limit: u16,
858 cx: &mut Context<Self>,
859 ) -> Task<Result<Vec<ChannelMembership>>> {
860 let client = self.client.clone();
861 let user_store = self.user_store.downgrade();
862 cx.spawn(async move |_, cx| {
863 let response = client
864 .request(proto::GetChannelMembers {
865 channel_id: channel_id.0,
866 query,
867 limit: limit as u64,
868 })
869 .await?;
870 user_store.update(cx, |user_store, _| {
871 user_store.insert(response.users);
872 response
873 .members
874 .into_iter()
875 .filter_map(|member| {
876 Some(ChannelMembership {
877 user: user_store.get_cached_user(member.user_id)?,
878 role: member.role(),
879 kind: member.kind(),
880 })
881 })
882 .collect()
883 })
884 })
885 }
886
887 pub fn remove_channel(
888 &self,
889 channel_id: ChannelId,
890 ) -> impl Future<Output = Result<()>> + use<> {
891 let client = self.client.clone();
892 async move {
893 client
894 .request(proto::DeleteChannel {
895 channel_id: channel_id.0,
896 })
897 .await?;
898 Ok(())
899 }
900 }
901
902 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
903 false
904 }
905
906 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
907 self.outgoing_invites.contains(&(channel_id, user_id))
908 }
909
910 async fn handle_update_channels(
911 this: Entity<Self>,
912 message: TypedEnvelope<proto::UpdateChannels>,
913 mut cx: AsyncApp,
914 ) -> Result<()> {
915 this.read_with(&mut cx, |this, _| {
916 this.update_channels_tx
917 .unbounded_send(message.payload)
918 .unwrap();
919 })?;
920 Ok(())
921 }
922
923 async fn handle_update_user_channels(
924 this: Entity<Self>,
925 message: TypedEnvelope<proto::UpdateUserChannels>,
926 mut cx: AsyncApp,
927 ) -> Result<()> {
928 this.update(&mut cx, |this, cx| {
929 for buffer_version in message.payload.observed_channel_buffer_version {
930 let version = language::proto::deserialize_version(&buffer_version.version);
931 this.acknowledge_notes_version(
932 ChannelId(buffer_version.channel_id),
933 buffer_version.epoch,
934 &version,
935 cx,
936 );
937 }
938 for message_id in message.payload.observed_channel_message_id {
939 this.acknowledge_message_id(
940 ChannelId(message_id.channel_id),
941 message_id.message_id,
942 cx,
943 );
944 }
945 for membership in message.payload.channel_memberships {
946 if let Some(role) = ChannelRole::from_i32(membership.role) {
947 this.channel_states
948 .entry(ChannelId(membership.channel_id))
949 .or_default()
950 .set_role(role)
951 }
952 }
953 })
954 }
955
956 fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
957 self.channel_index.clear();
958 self.channel_invitations.clear();
959 self.channel_participants.clear();
960 self.channel_index.clear();
961 self.outgoing_invites.clear();
962 self.disconnect_channel_buffers_task.take();
963
964 for chat in self.opened_chats.values() {
965 if let OpenEntityHandle::Open(chat) = chat {
966 if let Some(chat) = chat.upgrade() {
967 chat.update(cx, |chat, cx| {
968 chat.rejoin(cx);
969 });
970 }
971 }
972 }
973
974 let mut buffer_versions = Vec::new();
975 for buffer in self.opened_buffers.values() {
976 if let OpenEntityHandle::Open(buffer) = buffer {
977 if let Some(buffer) = buffer.upgrade() {
978 let channel_buffer = buffer.read(cx);
979 let buffer = channel_buffer.buffer().read(cx);
980 buffer_versions.push(proto::ChannelBufferVersion {
981 channel_id: channel_buffer.channel_id.0,
982 epoch: channel_buffer.epoch(),
983 version: language::proto::serialize_version(&buffer.version()),
984 });
985 }
986 }
987 }
988
989 if buffer_versions.is_empty() {
990 return Task::ready(Ok(()));
991 }
992
993 let response = self.client.request(proto::RejoinChannelBuffers {
994 buffers: buffer_versions,
995 });
996
997 cx.spawn(async move |this, cx| {
998 let mut response = response.await?;
999
1000 this.update(cx, |this, cx| {
1001 this.opened_buffers.retain(|_, buffer| match buffer {
1002 OpenEntityHandle::Open(channel_buffer) => {
1003 let Some(channel_buffer) = channel_buffer.upgrade() else {
1004 return false;
1005 };
1006
1007 channel_buffer.update(cx, |channel_buffer, cx| {
1008 let channel_id = channel_buffer.channel_id;
1009 if let Some(remote_buffer) = response
1010 .buffers
1011 .iter_mut()
1012 .find(|buffer| buffer.channel_id == channel_id.0)
1013 {
1014 let channel_id = channel_buffer.channel_id;
1015 let remote_version =
1016 language::proto::deserialize_version(&remote_buffer.version);
1017
1018 channel_buffer.replace_collaborators(
1019 mem::take(&mut remote_buffer.collaborators),
1020 cx,
1021 );
1022
1023 let operations = channel_buffer
1024 .buffer()
1025 .update(cx, |buffer, cx| {
1026 let outgoing_operations =
1027 buffer.serialize_ops(Some(remote_version), cx);
1028 let incoming_operations =
1029 mem::take(&mut remote_buffer.operations)
1030 .into_iter()
1031 .map(language::proto::deserialize_operation)
1032 .collect::<Result<Vec<_>>>()?;
1033 buffer.apply_ops(incoming_operations, cx);
1034 anyhow::Ok(outgoing_operations)
1035 })
1036 .log_err();
1037
1038 if let Some(operations) = operations {
1039 channel_buffer.connected(cx);
1040 let client = this.client.clone();
1041 cx.background_spawn(async move {
1042 let operations = operations.await;
1043 for chunk in language::proto::split_operations(operations) {
1044 client
1045 .send(proto::UpdateChannelBuffer {
1046 channel_id: channel_id.0,
1047 operations: chunk,
1048 })
1049 .ok();
1050 }
1051 })
1052 .detach();
1053 return true;
1054 }
1055 }
1056
1057 channel_buffer.disconnect(cx);
1058 false
1059 })
1060 }
1061 OpenEntityHandle::Loading(_) => true,
1062 });
1063 })
1064 .ok();
1065 anyhow::Ok(())
1066 })
1067 }
1068
1069 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1070 cx.notify();
1071 self.did_subscribe = false;
1072 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1073 cx.spawn(async move |this, cx| {
1074 if wait_for_reconnect {
1075 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1076 }
1077
1078 if let Some(this) = this.upgrade() {
1079 this.update(cx, |this, cx| {
1080 for (_, buffer) in &this.opened_buffers {
1081 if let OpenEntityHandle::Open(buffer) = &buffer {
1082 if let Some(buffer) = buffer.upgrade() {
1083 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1084 }
1085 }
1086 }
1087 })
1088 .ok();
1089 }
1090 })
1091 });
1092 }
1093
1094 #[cfg(any(test, feature = "test-support"))]
1095 pub fn reset(&mut self) {
1096 self.channel_invitations.clear();
1097 self.channel_index.clear();
1098 self.channel_participants.clear();
1099 self.outgoing_invites.clear();
1100 self.opened_buffers.clear();
1101 self.opened_chats.clear();
1102 self.disconnect_channel_buffers_task = None;
1103 self.channel_states.clear();
1104 }
1105
1106 pub(crate) fn update_channels(
1107 &mut self,
1108 payload: proto::UpdateChannels,
1109 cx: &mut Context<ChannelStore>,
1110 ) -> Option<Task<Result<()>>> {
1111 if !payload.remove_channel_invitations.is_empty() {
1112 self.channel_invitations
1113 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1114 }
1115 for channel in payload.channel_invitations {
1116 match self
1117 .channel_invitations
1118 .binary_search_by_key(&channel.id, |c| c.id.0)
1119 {
1120 Ok(ix) => {
1121 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1122 }
1123 Err(ix) => self.channel_invitations.insert(
1124 ix,
1125 Arc::new(Channel {
1126 id: ChannelId(channel.id),
1127 visibility: channel.visibility(),
1128 name: channel.name.into(),
1129 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1130 channel_order: channel.channel_order,
1131 }),
1132 ),
1133 }
1134 }
1135
1136 let channels_changed = !payload.channels.is_empty()
1137 || !payload.delete_channels.is_empty()
1138 || !payload.latest_channel_message_ids.is_empty()
1139 || !payload.latest_channel_buffer_versions.is_empty();
1140
1141 if channels_changed {
1142 if !payload.delete_channels.is_empty() {
1143 let delete_channels: Vec<ChannelId> =
1144 payload.delete_channels.into_iter().map(ChannelId).collect();
1145 self.channel_index.delete_channels(&delete_channels);
1146 self.channel_participants
1147 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1148
1149 for channel_id in &delete_channels {
1150 let channel_id = *channel_id;
1151 if payload
1152 .channels
1153 .iter()
1154 .any(|channel| channel.id == channel_id.0)
1155 {
1156 continue;
1157 }
1158 if let Some(OpenEntityHandle::Open(buffer)) =
1159 self.opened_buffers.remove(&channel_id)
1160 {
1161 if let Some(buffer) = buffer.upgrade() {
1162 buffer.update(cx, ChannelBuffer::disconnect);
1163 }
1164 }
1165 }
1166 }
1167
1168 let mut index = self.channel_index.bulk_insert();
1169 for channel in payload.channels {
1170 let id = ChannelId(channel.id);
1171 let channel_changed = index.insert(channel);
1172
1173 if channel_changed {
1174 if let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1175 if let Some(buffer) = buffer.upgrade() {
1176 buffer.update(cx, ChannelBuffer::channel_changed);
1177 }
1178 }
1179 }
1180 }
1181
1182 for latest_buffer_version in payload.latest_channel_buffer_versions {
1183 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1184 self.channel_states
1185 .entry(ChannelId(latest_buffer_version.channel_id))
1186 .or_default()
1187 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1188 }
1189
1190 for latest_channel_message in payload.latest_channel_message_ids {
1191 self.channel_states
1192 .entry(ChannelId(latest_channel_message.channel_id))
1193 .or_default()
1194 .update_latest_message_id(latest_channel_message.message_id);
1195 }
1196
1197 self.channels_loaded.0.try_send(true).log_err();
1198 }
1199
1200 cx.notify();
1201 if payload.channel_participants.is_empty() {
1202 return None;
1203 }
1204
1205 let mut all_user_ids = Vec::new();
1206 let channel_participants = payload.channel_participants;
1207 for entry in &channel_participants {
1208 for user_id in entry.participant_user_ids.iter() {
1209 if let Err(ix) = all_user_ids.binary_search(user_id) {
1210 all_user_ids.insert(ix, *user_id);
1211 }
1212 }
1213 }
1214
1215 let users = self
1216 .user_store
1217 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1218 Some(cx.spawn(async move |this, cx| {
1219 let users = users.await?;
1220
1221 this.update(cx, |this, cx| {
1222 for entry in &channel_participants {
1223 let mut participants: Vec<_> = entry
1224 .participant_user_ids
1225 .iter()
1226 .filter_map(|user_id| {
1227 users
1228 .binary_search_by_key(&user_id, |user| &user.id)
1229 .ok()
1230 .map(|ix| users[ix].clone())
1231 })
1232 .collect();
1233
1234 participants.sort_by_key(|u| u.id);
1235
1236 this.channel_participants
1237 .insert(ChannelId(entry.channel_id), participants);
1238 }
1239
1240 cx.notify();
1241 })
1242 }))
1243 }
1244}
1245
1246impl ChannelState {
1247 fn set_role(&mut self, role: ChannelRole) {
1248 self.role = Some(role);
1249 }
1250
1251 fn has_channel_buffer_changed(&self) -> bool {
1252 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1253 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1254 && self
1255 .latest_notes_version
1256 .version
1257 .changed_since(&self.observed_notes_version.version))
1258 }
1259
1260 fn has_new_messages(&self) -> bool {
1261 let latest_message_id = self.latest_chat_message;
1262 let observed_message_id = self.observed_chat_message;
1263
1264 latest_message_id.is_some_and(|latest_message_id| {
1265 latest_message_id > observed_message_id.unwrap_or_default()
1266 })
1267 }
1268
1269 fn last_acknowledged_message_id(&self) -> Option<u64> {
1270 self.observed_chat_message
1271 }
1272
1273 fn acknowledge_message_id(&mut self, message_id: u64) {
1274 let observed = self.observed_chat_message.get_or_insert(message_id);
1275 *observed = (*observed).max(message_id);
1276 }
1277
1278 fn update_latest_message_id(&mut self, message_id: u64) {
1279 self.latest_chat_message =
1280 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1281 }
1282
1283 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1284 if self.observed_notes_version.epoch == epoch {
1285 self.observed_notes_version.version.join(version);
1286 } else {
1287 self.observed_notes_version = NotesVersion {
1288 epoch,
1289 version: version.clone(),
1290 };
1291 }
1292 }
1293
1294 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1295 if self.latest_notes_version.epoch == epoch {
1296 self.latest_notes_version.version.join(version);
1297 } else {
1298 self.latest_notes_version = NotesVersion {
1299 epoch,
1300 version: version.clone(),
1301 };
1302 }
1303 }
1304}