1mod channel_index;
2
3use crate::{ChannelMessage, channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
4use anyhow::{Context as _, Result, anyhow};
5use channel_index::ChannelIndex;
6use client::{ChannelId, Client, ClientSettings, Subscription, User, UserId, UserStore};
7use collections::{HashMap, HashSet};
8use futures::{Future, FutureExt, StreamExt, channel::mpsc, future::Shared};
9use gpui::{
10 App, AppContext as _, AsyncApp, Context, Entity, EventEmitter, Global, SharedString, Task,
11 WeakEntity,
12};
13use language::Capability;
14use postage::{sink::Sink, watch};
15use rpc::{
16 TypedEnvelope,
17 proto::{self, ChannelRole, ChannelVisibility},
18};
19use settings::Settings;
20use std::{mem, sync::Arc, time::Duration};
21use util::{ResultExt, maybe};
22
23pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
24
25pub fn init(client: &Arc<Client>, user_store: Entity<UserStore>, cx: &mut App) {
26 let channel_store = cx.new(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
27 cx.set_global(GlobalChannelStore(channel_store));
28}
29
30#[derive(Debug, Clone, Default, PartialEq)]
31struct NotesVersion {
32 epoch: u64,
33 version: clock::Global,
34}
35
36pub struct ChannelStore {
37 pub channel_index: ChannelIndex,
38 channel_invitations: Vec<Arc<Channel>>,
39 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
40 channel_states: HashMap<ChannelId, ChannelState>,
41 outgoing_invites: HashSet<(ChannelId, UserId)>,
42 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
43 opened_buffers: HashMap<ChannelId, OpenEntityHandle<ChannelBuffer>>,
44 opened_chats: HashMap<ChannelId, OpenEntityHandle<ChannelChat>>,
45 client: Arc<Client>,
46 did_subscribe: bool,
47 channels_loaded: (watch::Sender<bool>, watch::Receiver<bool>),
48 user_store: Entity<UserStore>,
49 _rpc_subscriptions: [Subscription; 2],
50 _watch_connection_status: Task<Option<()>>,
51 disconnect_channel_buffers_task: Option<Task<()>>,
52 _update_channels: Task<()>,
53}
54
55#[derive(Clone, Debug)]
56pub struct Channel {
57 pub id: ChannelId,
58 pub name: SharedString,
59 pub visibility: proto::ChannelVisibility,
60 pub parent_path: Vec<ChannelId>,
61 pub channel_order: i32,
62}
63
64#[derive(Default, Debug)]
65pub struct ChannelState {
66 latest_chat_message: Option<u64>,
67 latest_notes_version: NotesVersion,
68 observed_notes_version: NotesVersion,
69 observed_chat_message: Option<u64>,
70 role: Option<ChannelRole>,
71}
72
73impl Channel {
74 pub fn link(&self, cx: &App) -> String {
75 format!(
76 "{}/channel/{}-{}",
77 ClientSettings::get_global(cx).server_url,
78 Self::slug(&self.name),
79 self.id
80 )
81 }
82
83 pub fn notes_link(&self, heading: Option<String>, cx: &App) -> String {
84 self.link(cx)
85 + "/notes"
86 + &heading
87 .map(|h| format!("#{}", Self::slug(&h)))
88 .unwrap_or_default()
89 }
90
91 pub fn is_root_channel(&self) -> bool {
92 self.parent_path.is_empty()
93 }
94
95 pub fn root_id(&self) -> ChannelId {
96 self.parent_path.first().copied().unwrap_or(self.id)
97 }
98
99 pub fn slug(str: &str) -> String {
100 let slug: String = str
101 .chars()
102 .map(|c| if c.is_alphanumeric() { c } else { '-' })
103 .collect();
104
105 slug.trim_matches(|c| c == '-').to_string()
106 }
107}
108
109#[derive(Debug)]
110pub struct ChannelMembership {
111 pub user: Arc<User>,
112 pub kind: proto::channel_member::Kind,
113 pub role: proto::ChannelRole,
114}
115impl ChannelMembership {
116 pub fn sort_key(&self) -> MembershipSortKey<'_> {
117 MembershipSortKey {
118 role_order: match self.role {
119 proto::ChannelRole::Admin => 0,
120 proto::ChannelRole::Member => 1,
121 proto::ChannelRole::Banned => 2,
122 proto::ChannelRole::Talker => 3,
123 proto::ChannelRole::Guest => 4,
124 },
125 kind_order: match self.kind {
126 proto::channel_member::Kind::Member => 0,
127 proto::channel_member::Kind::Invitee => 1,
128 },
129 username_order: &self.user.github_login,
130 }
131 }
132}
133
134#[derive(PartialOrd, Ord, PartialEq, Eq)]
135pub struct MembershipSortKey<'a> {
136 role_order: u8,
137 kind_order: u8,
138 username_order: &'a str,
139}
140
141pub enum ChannelEvent {
142 ChannelCreated(ChannelId),
143 ChannelRenamed(ChannelId),
144}
145
146impl EventEmitter<ChannelEvent> for ChannelStore {}
147
148enum OpenEntityHandle<E> {
149 Open(WeakEntity<E>),
150 Loading(Shared<Task<Result<Entity<E>, Arc<anyhow::Error>>>>),
151}
152
153struct GlobalChannelStore(Entity<ChannelStore>);
154
155impl Global for GlobalChannelStore {}
156
157impl ChannelStore {
158 pub fn global(cx: &App) -> Entity<Self> {
159 cx.global::<GlobalChannelStore>().0.clone()
160 }
161
162 pub fn new(client: Arc<Client>, user_store: Entity<UserStore>, cx: &mut Context<Self>) -> Self {
163 let rpc_subscriptions = [
164 client.add_message_handler(cx.weak_entity(), Self::handle_update_channels),
165 client.add_message_handler(cx.weak_entity(), Self::handle_update_user_channels),
166 ];
167
168 let mut connection_status = client.status();
169 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
170 let watch_connection_status = cx.spawn(async move |this, cx| {
171 while let Some(status) = connection_status.next().await {
172 let this = this.upgrade()?;
173 match status {
174 client::Status::Connected { .. } => {
175 this.update(cx, |this, cx| this.handle_connect(cx))
176 .ok()?
177 .await
178 .log_err()?;
179 }
180 client::Status::SignedOut | client::Status::UpgradeRequired => {
181 this.update(cx, |this, cx| this.handle_disconnect(false, cx))
182 .ok();
183 }
184 _ => {
185 this.update(cx, |this, cx| this.handle_disconnect(true, cx))
186 .ok();
187 }
188 }
189 }
190 Some(())
191 });
192
193 Self {
194 channel_invitations: Vec::default(),
195 channel_index: ChannelIndex::default(),
196 channel_participants: Default::default(),
197 outgoing_invites: Default::default(),
198 opened_buffers: Default::default(),
199 opened_chats: Default::default(),
200 update_channels_tx,
201 client,
202 user_store,
203 _rpc_subscriptions: rpc_subscriptions,
204 _watch_connection_status: watch_connection_status,
205 disconnect_channel_buffers_task: None,
206 _update_channels: cx.spawn(async move |this, cx| {
207 maybe!(async move {
208 while let Some(update_channels) = update_channels_rx.next().await {
209 if let Some(this) = this.upgrade() {
210 let update_task = this
211 .update(cx, |this, cx| this.update_channels(update_channels, cx))?;
212 if let Some(update_task) = update_task {
213 update_task.await.log_err();
214 }
215 }
216 }
217 anyhow::Ok(())
218 })
219 .await
220 .log_err();
221 }),
222 channel_states: Default::default(),
223 did_subscribe: false,
224 channels_loaded: watch::channel_with(false),
225 }
226 }
227
228 pub fn initialize(&mut self) {
229 if !self.did_subscribe
230 && self
231 .client
232 .send(proto::SubscribeToChannels {})
233 .log_err()
234 .is_some()
235 {
236 self.did_subscribe = true;
237 }
238 }
239
240 pub fn wait_for_channels(
241 &mut self,
242 timeout: Duration,
243 cx: &mut Context<Self>,
244 ) -> Task<Result<()>> {
245 let mut channels_loaded_rx = self.channels_loaded.1.clone();
246 if *channels_loaded_rx.borrow() {
247 return Task::ready(Ok(()));
248 }
249
250 let mut status_receiver = self.client.status();
251 if status_receiver.borrow().is_connected() {
252 self.initialize();
253 }
254
255 let mut timer = cx.background_executor().timer(timeout).fuse();
256 cx.spawn(async move |this, cx| {
257 loop {
258 futures::select_biased! {
259 channels_loaded = channels_loaded_rx.next().fuse() => {
260 if let Some(true) = channels_loaded {
261 return Ok(());
262 }
263 }
264 status = status_receiver.next().fuse() => {
265 if let Some(status) = status
266 && status.is_connected() {
267 this.update(cx, |this, _cx| {
268 this.initialize();
269 }).ok();
270 }
271 continue;
272 }
273 _ = timer => {
274 return Err(anyhow!("{:?} elapsed without receiving channels", timeout));
275 }
276 }
277 }
278 })
279 }
280
281 pub fn client(&self) -> Arc<Client> {
282 self.client.clone()
283 }
284
285 /// Returns the number of unique channels in the store
286 pub fn channel_count(&self) -> usize {
287 self.channel_index.by_id().len()
288 }
289
290 /// Returns the index of a channel ID in the list of unique channels
291 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
292 self.channel_index
293 .by_id()
294 .keys()
295 .position(|id| *id == channel_id)
296 }
297
298 /// Returns an iterator over all unique channels
299 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
300 self.channel_index.by_id().values()
301 }
302
303 /// Iterate over all entries in the channel DAG
304 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
305 self.channel_index
306 .ordered_channels()
307 .iter()
308 .filter_map(move |id| {
309 let channel = self.channel_index.by_id().get(id)?;
310 Some((channel.parent_path.len(), channel))
311 })
312 }
313
314 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
315 let channel_id = self.channel_index.ordered_channels().get(ix)?;
316 self.channel_index.by_id().get(channel_id)
317 }
318
319 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
320 self.channel_index.by_id().values().nth(ix)
321 }
322
323 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
324 self.channel_invitations
325 .iter()
326 .any(|channel| channel.id == channel_id)
327 }
328
329 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
330 &self.channel_invitations
331 }
332
333 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
334 self.channel_index.by_id().get(&channel_id)
335 }
336
337 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &App) -> bool {
338 if let Some(buffer) = self.opened_buffers.get(&channel_id)
339 && let OpenEntityHandle::Open(buffer) = buffer
340 {
341 return buffer.upgrade().is_some();
342 }
343 false
344 }
345
346 pub fn open_channel_buffer(
347 &mut self,
348 channel_id: ChannelId,
349 cx: &mut Context<Self>,
350 ) -> Task<Result<Entity<ChannelBuffer>>> {
351 let client = self.client.clone();
352 let user_store = self.user_store.clone();
353 let channel_store = cx.entity();
354 self.open_channel_resource(
355 channel_id,
356 "notes",
357 |this| &mut this.opened_buffers,
358 async move |channel, cx| {
359 ChannelBuffer::new(channel, client, user_store, channel_store, cx).await
360 },
361 cx,
362 )
363 }
364
365 pub fn fetch_channel_messages(
366 &self,
367 message_ids: Vec<u64>,
368 cx: &mut Context<Self>,
369 ) -> Task<Result<Vec<ChannelMessage>>> {
370 let request = if message_ids.is_empty() {
371 None
372 } else {
373 Some(
374 self.client
375 .request(proto::GetChannelMessagesById { message_ids }),
376 )
377 };
378 cx.spawn(async move |this, cx| {
379 if let Some(request) = request {
380 let response = request.await?;
381 let this = this.upgrade().context("channel store dropped")?;
382 let user_store = this.read_with(cx, |this, _| this.user_store.clone())?;
383 ChannelMessage::from_proto_vec(response.messages, &user_store, cx).await
384 } else {
385 Ok(Vec::new())
386 }
387 })
388 }
389
390 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
391 self.channel_states
392 .get(&channel_id)
393 .is_some_and(|state| state.has_channel_buffer_changed())
394 }
395
396 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
397 self.channel_states
398 .get(&channel_id)
399 .is_some_and(|state| state.has_new_messages())
400 }
401
402 pub fn set_acknowledged_message_id(&mut self, channel_id: ChannelId, message_id: Option<u64>) {
403 if let Some(state) = self.channel_states.get_mut(&channel_id) {
404 state.latest_chat_message = message_id;
405 }
406 }
407
408 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
409 self.channel_states.get(&channel_id).and_then(|state| {
410 if let Some(last_message_id) = state.latest_chat_message
411 && state
412 .last_acknowledged_message_id()
413 .is_some_and(|id| id < last_message_id)
414 {
415 return state.last_acknowledged_message_id();
416 }
417
418 None
419 })
420 }
421
422 pub fn acknowledge_message_id(
423 &mut self,
424 channel_id: ChannelId,
425 message_id: u64,
426 cx: &mut Context<Self>,
427 ) {
428 self.channel_states
429 .entry(channel_id)
430 .or_default()
431 .acknowledge_message_id(message_id);
432 cx.notify();
433 }
434
435 pub fn update_latest_message_id(
436 &mut self,
437 channel_id: ChannelId,
438 message_id: u64,
439 cx: &mut Context<Self>,
440 ) {
441 self.channel_states
442 .entry(channel_id)
443 .or_default()
444 .update_latest_message_id(message_id);
445 cx.notify();
446 }
447
448 pub fn acknowledge_notes_version(
449 &mut self,
450 channel_id: ChannelId,
451 epoch: u64,
452 version: &clock::Global,
453 cx: &mut Context<Self>,
454 ) {
455 self.channel_states
456 .entry(channel_id)
457 .or_default()
458 .acknowledge_notes_version(epoch, version);
459 cx.notify()
460 }
461
462 pub fn update_latest_notes_version(
463 &mut self,
464 channel_id: ChannelId,
465 epoch: u64,
466 version: &clock::Global,
467 cx: &mut Context<Self>,
468 ) {
469 self.channel_states
470 .entry(channel_id)
471 .or_default()
472 .update_latest_notes_version(epoch, version);
473 cx.notify()
474 }
475
476 pub fn open_channel_chat(
477 &mut self,
478 channel_id: ChannelId,
479 cx: &mut Context<Self>,
480 ) -> Task<Result<Entity<ChannelChat>>> {
481 let client = self.client.clone();
482 let user_store = self.user_store.clone();
483 let this = cx.entity();
484 self.open_channel_resource(
485 channel_id,
486 "chat",
487 |this| &mut this.opened_chats,
488 async move |channel, cx| ChannelChat::new(channel, this, user_store, client, cx).await,
489 cx,
490 )
491 }
492
493 /// Asynchronously open a given resource associated with a channel.
494 ///
495 /// Make sure that the resource is only opened once, even if this method
496 /// is called multiple times with the same channel id while the first task
497 /// is still running.
498 fn open_channel_resource<T, F>(
499 &mut self,
500 channel_id: ChannelId,
501 resource_name: &'static str,
502 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenEntityHandle<T>>,
503 load: F,
504 cx: &mut Context<Self>,
505 ) -> Task<Result<Entity<T>>>
506 where
507 F: AsyncFnOnce(Arc<Channel>, &mut AsyncApp) -> Result<Entity<T>> + 'static,
508 T: 'static,
509 {
510 let task = loop {
511 match get_map(self).get(&channel_id) {
512 Some(OpenEntityHandle::Open(entity)) => {
513 if let Some(entity) = entity.upgrade() {
514 break Task::ready(Ok(entity)).shared();
515 } else {
516 get_map(self).remove(&channel_id);
517 continue;
518 }
519 }
520 Some(OpenEntityHandle::Loading(task)) => break task.clone(),
521 None => {}
522 }
523
524 let channels_ready = self.wait_for_channels(Duration::from_secs(10), cx);
525 let task = cx
526 .spawn(async move |this, cx| {
527 channels_ready.await?;
528 let channel = this.read_with(cx, |this, _| {
529 this.channel_for_id(channel_id)
530 .cloned()
531 .ok_or_else(|| Arc::new(anyhow!("no channel for id: {channel_id}")))
532 })??;
533
534 load(channel, cx).await.map_err(Arc::new)
535 })
536 .shared();
537
538 get_map(self).insert(channel_id, OpenEntityHandle::Loading(task.clone()));
539 let task = cx.spawn({
540 async move |this, cx| {
541 let result = task.await;
542 this.update(cx, |this, _| match &result {
543 Ok(model) => {
544 get_map(this)
545 .insert(channel_id, OpenEntityHandle::Open(model.downgrade()));
546 }
547 Err(_) => {
548 get_map(this).remove(&channel_id);
549 }
550 })?;
551 result
552 }
553 });
554 break task.shared();
555 };
556 cx.background_spawn(async move {
557 task.await.map_err(|error| {
558 anyhow!("{error}").context(format!("failed to open channel {resource_name}"))
559 })
560 })
561 }
562
563 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
564 self.channel_role(channel_id) == proto::ChannelRole::Admin
565 }
566
567 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
568 self.channel_index
569 .by_id()
570 .get(&channel_id)
571 .map_or(false, |channel| channel.is_root_channel())
572 }
573
574 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
575 self.channel_index
576 .by_id()
577 .get(&channel_id)
578 .map_or(false, |channel| {
579 channel.visibility == ChannelVisibility::Public
580 })
581 }
582
583 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
584 match self.channel_role(channel_id) {
585 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
586 _ => Capability::ReadOnly,
587 }
588 }
589
590 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
591 maybe!({
592 let mut channel = self.channel_for_id(channel_id)?;
593 if !channel.is_root_channel() {
594 channel = self.channel_for_id(channel.root_id())?;
595 }
596 let root_channel_state = self.channel_states.get(&channel.id);
597 root_channel_state?.role
598 })
599 .unwrap_or(proto::ChannelRole::Guest)
600 }
601
602 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
603 self.channel_participants
604 .get(&channel_id)
605 .map_or(&[], |v| v.as_slice())
606 }
607
608 pub fn create_channel(
609 &self,
610 name: &str,
611 parent_id: Option<ChannelId>,
612 cx: &mut Context<Self>,
613 ) -> Task<Result<ChannelId>> {
614 let client = self.client.clone();
615 let name = name.trim_start_matches('#').to_owned();
616 cx.spawn(async move |this, cx| {
617 let response = client
618 .request(proto::CreateChannel {
619 name,
620 parent_id: parent_id.map(|cid| cid.0),
621 })
622 .await?;
623
624 let channel = response.channel.context("missing channel in response")?;
625 let channel_id = ChannelId(channel.id);
626
627 this.update(cx, |this, cx| {
628 let task = this.update_channels(
629 proto::UpdateChannels {
630 channels: vec![channel],
631 ..Default::default()
632 },
633 cx,
634 );
635 assert!(task.is_none());
636
637 // This event is emitted because the collab panel wants to clear the pending edit state
638 // before this frame is rendered. But we can't guarantee that the collab panel's future
639 // will resolve before this flush_effects finishes. Synchronously emitting this event
640 // ensures that the collab panel will observe this creation before the frame completes
641 cx.emit(ChannelEvent::ChannelCreated(channel_id));
642 })?;
643
644 Ok(channel_id)
645 })
646 }
647
648 pub fn move_channel(
649 &mut self,
650 channel_id: ChannelId,
651 to: ChannelId,
652 cx: &mut Context<Self>,
653 ) -> Task<Result<()>> {
654 let client = self.client.clone();
655 cx.spawn(async move |_, _| {
656 let _ = client
657 .request(proto::MoveChannel {
658 channel_id: channel_id.0,
659 to: to.0,
660 })
661 .await?;
662 Ok(())
663 })
664 }
665
666 pub fn reorder_channel(
667 &mut self,
668 channel_id: ChannelId,
669 direction: proto::reorder_channel::Direction,
670 cx: &mut Context<Self>,
671 ) -> Task<Result<()>> {
672 let client = self.client.clone();
673 cx.spawn(async move |_, _| {
674 client
675 .request(proto::ReorderChannel {
676 channel_id: channel_id.0,
677 direction: direction.into(),
678 })
679 .await?;
680 Ok(())
681 })
682 }
683
684 pub fn set_channel_visibility(
685 &mut self,
686 channel_id: ChannelId,
687 visibility: ChannelVisibility,
688 cx: &mut Context<Self>,
689 ) -> Task<Result<()>> {
690 let client = self.client.clone();
691 cx.spawn(async move |_, _| {
692 let _ = client
693 .request(proto::SetChannelVisibility {
694 channel_id: channel_id.0,
695 visibility: visibility.into(),
696 })
697 .await?;
698
699 Ok(())
700 })
701 }
702
703 pub fn invite_member(
704 &mut self,
705 channel_id: ChannelId,
706 user_id: UserId,
707 role: proto::ChannelRole,
708 cx: &mut Context<Self>,
709 ) -> Task<Result<()>> {
710 if !self.outgoing_invites.insert((channel_id, user_id)) {
711 return Task::ready(Err(anyhow!("invite request already in progress")));
712 }
713
714 cx.notify();
715 let client = self.client.clone();
716 cx.spawn(async move |this, cx| {
717 let result = client
718 .request(proto::InviteChannelMember {
719 channel_id: channel_id.0,
720 user_id,
721 role: role.into(),
722 })
723 .await;
724
725 this.update(cx, |this, cx| {
726 this.outgoing_invites.remove(&(channel_id, user_id));
727 cx.notify();
728 })?;
729
730 result?;
731
732 Ok(())
733 })
734 }
735
736 pub fn remove_member(
737 &mut self,
738 channel_id: ChannelId,
739 user_id: u64,
740 cx: &mut Context<Self>,
741 ) -> Task<Result<()>> {
742 if !self.outgoing_invites.insert((channel_id, user_id)) {
743 return Task::ready(Err(anyhow!("invite request already in progress")));
744 }
745
746 cx.notify();
747 let client = self.client.clone();
748 cx.spawn(async move |this, cx| {
749 let result = client
750 .request(proto::RemoveChannelMember {
751 channel_id: channel_id.0,
752 user_id,
753 })
754 .await;
755
756 this.update(cx, |this, cx| {
757 this.outgoing_invites.remove(&(channel_id, user_id));
758 cx.notify();
759 })?;
760 result?;
761 Ok(())
762 })
763 }
764
765 pub fn set_member_role(
766 &mut self,
767 channel_id: ChannelId,
768 user_id: UserId,
769 role: proto::ChannelRole,
770 cx: &mut Context<Self>,
771 ) -> Task<Result<()>> {
772 if !self.outgoing_invites.insert((channel_id, user_id)) {
773 return Task::ready(Err(anyhow!("member request already in progress")));
774 }
775
776 cx.notify();
777 let client = self.client.clone();
778 cx.spawn(async move |this, cx| {
779 let result = client
780 .request(proto::SetChannelMemberRole {
781 channel_id: channel_id.0,
782 user_id,
783 role: role.into(),
784 })
785 .await;
786
787 this.update(cx, |this, cx| {
788 this.outgoing_invites.remove(&(channel_id, user_id));
789 cx.notify();
790 })?;
791
792 result?;
793 Ok(())
794 })
795 }
796
797 pub fn rename(
798 &mut self,
799 channel_id: ChannelId,
800 new_name: &str,
801 cx: &mut Context<Self>,
802 ) -> Task<Result<()>> {
803 let client = self.client.clone();
804 let name = new_name.to_string();
805 cx.spawn(async move |this, cx| {
806 let channel = client
807 .request(proto::RenameChannel {
808 channel_id: channel_id.0,
809 name,
810 })
811 .await?
812 .channel
813 .context("missing channel in response")?;
814 this.update(cx, |this, cx| {
815 let task = this.update_channels(
816 proto::UpdateChannels {
817 channels: vec![channel],
818 ..Default::default()
819 },
820 cx,
821 );
822 assert!(task.is_none());
823
824 // This event is emitted because the collab panel wants to clear the pending edit state
825 // before this frame is rendered. But we can't guarantee that the collab panel's future
826 // will resolve before this flush_effects finishes. Synchronously emitting this event
827 // ensures that the collab panel will observe this creation before the frame complete
828 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
829 })?;
830 Ok(())
831 })
832 }
833
834 pub fn respond_to_channel_invite(
835 &mut self,
836 channel_id: ChannelId,
837 accept: bool,
838 cx: &mut Context<Self>,
839 ) -> Task<Result<()>> {
840 let client = self.client.clone();
841 cx.background_spawn(async move {
842 client
843 .request(proto::RespondToChannelInvite {
844 channel_id: channel_id.0,
845 accept,
846 })
847 .await?;
848 Ok(())
849 })
850 }
851 pub fn fuzzy_search_members(
852 &self,
853 channel_id: ChannelId,
854 query: String,
855 limit: u16,
856 cx: &mut Context<Self>,
857 ) -> Task<Result<Vec<ChannelMembership>>> {
858 let client = self.client.clone();
859 let user_store = self.user_store.downgrade();
860 cx.spawn(async move |_, cx| {
861 let response = client
862 .request(proto::GetChannelMembers {
863 channel_id: channel_id.0,
864 query,
865 limit: limit as u64,
866 })
867 .await?;
868 user_store.update(cx, |user_store, _| {
869 user_store.insert(response.users);
870 response
871 .members
872 .into_iter()
873 .filter_map(|member| {
874 Some(ChannelMembership {
875 user: user_store.get_cached_user(member.user_id)?,
876 role: member.role(),
877 kind: member.kind(),
878 })
879 })
880 .collect()
881 })
882 })
883 }
884
885 pub fn remove_channel(
886 &self,
887 channel_id: ChannelId,
888 ) -> impl Future<Output = Result<()>> + use<> {
889 let client = self.client.clone();
890 async move {
891 client
892 .request(proto::DeleteChannel {
893 channel_id: channel_id.0,
894 })
895 .await?;
896 Ok(())
897 }
898 }
899
900 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
901 false
902 }
903
904 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
905 self.outgoing_invites.contains(&(channel_id, user_id))
906 }
907
908 async fn handle_update_channels(
909 this: Entity<Self>,
910 message: TypedEnvelope<proto::UpdateChannels>,
911 mut cx: AsyncApp,
912 ) -> Result<()> {
913 this.read_with(&mut cx, |this, _| {
914 this.update_channels_tx
915 .unbounded_send(message.payload)
916 .unwrap();
917 })?;
918 Ok(())
919 }
920
921 async fn handle_update_user_channels(
922 this: Entity<Self>,
923 message: TypedEnvelope<proto::UpdateUserChannels>,
924 mut cx: AsyncApp,
925 ) -> Result<()> {
926 this.update(&mut cx, |this, cx| {
927 for buffer_version in message.payload.observed_channel_buffer_version {
928 let version = language::proto::deserialize_version(&buffer_version.version);
929 this.acknowledge_notes_version(
930 ChannelId(buffer_version.channel_id),
931 buffer_version.epoch,
932 &version,
933 cx,
934 );
935 }
936 for message_id in message.payload.observed_channel_message_id {
937 this.acknowledge_message_id(
938 ChannelId(message_id.channel_id),
939 message_id.message_id,
940 cx,
941 );
942 }
943 for membership in message.payload.channel_memberships {
944 if let Some(role) = ChannelRole::from_i32(membership.role) {
945 this.channel_states
946 .entry(ChannelId(membership.channel_id))
947 .or_default()
948 .set_role(role)
949 }
950 }
951 })
952 }
953
954 fn handle_connect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
955 self.channel_index.clear();
956 self.channel_invitations.clear();
957 self.channel_participants.clear();
958 self.channel_index.clear();
959 self.outgoing_invites.clear();
960 self.disconnect_channel_buffers_task.take();
961
962 for chat in self.opened_chats.values() {
963 if let OpenEntityHandle::Open(chat) = chat
964 && let Some(chat) = chat.upgrade()
965 {
966 chat.update(cx, |chat, cx| {
967 chat.rejoin(cx);
968 });
969 }
970 }
971
972 let mut buffer_versions = Vec::new();
973 for buffer in self.opened_buffers.values() {
974 if let OpenEntityHandle::Open(buffer) = buffer
975 && let Some(buffer) = buffer.upgrade()
976 {
977 let channel_buffer = buffer.read(cx);
978 let buffer = channel_buffer.buffer().read(cx);
979 buffer_versions.push(proto::ChannelBufferVersion {
980 channel_id: channel_buffer.channel_id.0,
981 epoch: channel_buffer.epoch(),
982 version: language::proto::serialize_version(&buffer.version()),
983 });
984 }
985 }
986
987 if buffer_versions.is_empty() {
988 return Task::ready(Ok(()));
989 }
990
991 let response = self.client.request(proto::RejoinChannelBuffers {
992 buffers: buffer_versions,
993 });
994
995 cx.spawn(async move |this, cx| {
996 let mut response = response.await?;
997
998 this.update(cx, |this, cx| {
999 this.opened_buffers.retain(|_, buffer| match buffer {
1000 OpenEntityHandle::Open(channel_buffer) => {
1001 let Some(channel_buffer) = channel_buffer.upgrade() else {
1002 return false;
1003 };
1004
1005 channel_buffer.update(cx, |channel_buffer, cx| {
1006 let channel_id = channel_buffer.channel_id;
1007 if let Some(remote_buffer) = response
1008 .buffers
1009 .iter_mut()
1010 .find(|buffer| buffer.channel_id == channel_id.0)
1011 {
1012 let channel_id = channel_buffer.channel_id;
1013 let remote_version =
1014 language::proto::deserialize_version(&remote_buffer.version);
1015
1016 channel_buffer.replace_collaborators(
1017 mem::take(&mut remote_buffer.collaborators),
1018 cx,
1019 );
1020
1021 let operations = channel_buffer
1022 .buffer()
1023 .update(cx, |buffer, cx| {
1024 let outgoing_operations =
1025 buffer.serialize_ops(Some(remote_version), cx);
1026 let incoming_operations =
1027 mem::take(&mut remote_buffer.operations)
1028 .into_iter()
1029 .map(language::proto::deserialize_operation)
1030 .collect::<Result<Vec<_>>>()?;
1031 buffer.apply_ops(incoming_operations, cx);
1032 anyhow::Ok(outgoing_operations)
1033 })
1034 .log_err();
1035
1036 if let Some(operations) = operations {
1037 channel_buffer.connected(cx);
1038 let client = this.client.clone();
1039 cx.background_spawn(async move {
1040 let operations = operations.await;
1041 for chunk in language::proto::split_operations(operations) {
1042 client
1043 .send(proto::UpdateChannelBuffer {
1044 channel_id: channel_id.0,
1045 operations: chunk,
1046 })
1047 .ok();
1048 }
1049 })
1050 .detach();
1051 return true;
1052 }
1053 }
1054
1055 channel_buffer.disconnect(cx);
1056 false
1057 })
1058 }
1059 OpenEntityHandle::Loading(_) => true,
1060 });
1061 })
1062 .ok();
1063 anyhow::Ok(())
1064 })
1065 }
1066
1067 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut Context<Self>) {
1068 cx.notify();
1069 self.did_subscribe = false;
1070 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
1071 cx.spawn(async move |this, cx| {
1072 if wait_for_reconnect {
1073 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
1074 }
1075
1076 if let Some(this) = this.upgrade() {
1077 this.update(cx, |this, cx| {
1078 for (_, buffer) in &this.opened_buffers {
1079 if let OpenEntityHandle::Open(buffer) = &buffer
1080 && let Some(buffer) = buffer.upgrade()
1081 {
1082 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
1083 }
1084 }
1085 })
1086 .ok();
1087 }
1088 })
1089 });
1090 }
1091
1092 #[cfg(any(test, feature = "test-support"))]
1093 pub fn reset(&mut self) {
1094 self.channel_invitations.clear();
1095 self.channel_index.clear();
1096 self.channel_participants.clear();
1097 self.outgoing_invites.clear();
1098 self.opened_buffers.clear();
1099 self.opened_chats.clear();
1100 self.disconnect_channel_buffers_task = None;
1101 self.channel_states.clear();
1102 }
1103
1104 pub(crate) fn update_channels(
1105 &mut self,
1106 payload: proto::UpdateChannels,
1107 cx: &mut Context<ChannelStore>,
1108 ) -> Option<Task<Result<()>>> {
1109 if !payload.remove_channel_invitations.is_empty() {
1110 self.channel_invitations
1111 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id.0));
1112 }
1113 for channel in payload.channel_invitations {
1114 match self
1115 .channel_invitations
1116 .binary_search_by_key(&channel.id, |c| c.id.0)
1117 {
1118 Ok(ix) => {
1119 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1120 }
1121 Err(ix) => self.channel_invitations.insert(
1122 ix,
1123 Arc::new(Channel {
1124 id: ChannelId(channel.id),
1125 visibility: channel.visibility(),
1126 name: channel.name.into(),
1127 parent_path: channel.parent_path.into_iter().map(ChannelId).collect(),
1128 channel_order: channel.channel_order,
1129 }),
1130 ),
1131 }
1132 }
1133
1134 let channels_changed = !payload.channels.is_empty()
1135 || !payload.delete_channels.is_empty()
1136 || !payload.latest_channel_message_ids.is_empty()
1137 || !payload.latest_channel_buffer_versions.is_empty();
1138
1139 if channels_changed {
1140 if !payload.delete_channels.is_empty() {
1141 let delete_channels: Vec<ChannelId> =
1142 payload.delete_channels.into_iter().map(ChannelId).collect();
1143 self.channel_index.delete_channels(&delete_channels);
1144 self.channel_participants
1145 .retain(|channel_id, _| !delete_channels.contains(channel_id));
1146
1147 for channel_id in &delete_channels {
1148 let channel_id = *channel_id;
1149 if payload
1150 .channels
1151 .iter()
1152 .any(|channel| channel.id == channel_id.0)
1153 {
1154 continue;
1155 }
1156 if let Some(OpenEntityHandle::Open(buffer)) =
1157 self.opened_buffers.remove(&channel_id)
1158 && let Some(buffer) = buffer.upgrade()
1159 {
1160 buffer.update(cx, ChannelBuffer::disconnect);
1161 }
1162 }
1163 }
1164
1165 let mut index = self.channel_index.bulk_insert();
1166 for channel in payload.channels {
1167 let id = ChannelId(channel.id);
1168 let channel_changed = index.insert(channel);
1169
1170 if channel_changed
1171 && let Some(OpenEntityHandle::Open(buffer)) = self.opened_buffers.get(&id)
1172 && let Some(buffer) = buffer.upgrade()
1173 {
1174 buffer.update(cx, ChannelBuffer::channel_changed);
1175 }
1176 }
1177
1178 for latest_buffer_version in payload.latest_channel_buffer_versions {
1179 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1180 self.channel_states
1181 .entry(ChannelId(latest_buffer_version.channel_id))
1182 .or_default()
1183 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1184 }
1185
1186 for latest_channel_message in payload.latest_channel_message_ids {
1187 self.channel_states
1188 .entry(ChannelId(latest_channel_message.channel_id))
1189 .or_default()
1190 .update_latest_message_id(latest_channel_message.message_id);
1191 }
1192
1193 self.channels_loaded.0.try_send(true).log_err();
1194 }
1195
1196 cx.notify();
1197 if payload.channel_participants.is_empty() {
1198 return None;
1199 }
1200
1201 let mut all_user_ids = Vec::new();
1202 let channel_participants = payload.channel_participants;
1203 for entry in &channel_participants {
1204 for user_id in entry.participant_user_ids.iter() {
1205 if let Err(ix) = all_user_ids.binary_search(user_id) {
1206 all_user_ids.insert(ix, *user_id);
1207 }
1208 }
1209 }
1210
1211 let users = self
1212 .user_store
1213 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1214 Some(cx.spawn(async move |this, cx| {
1215 let users = users.await?;
1216
1217 this.update(cx, |this, cx| {
1218 for entry in &channel_participants {
1219 let mut participants: Vec<_> = entry
1220 .participant_user_ids
1221 .iter()
1222 .filter_map(|user_id| {
1223 users
1224 .binary_search_by_key(&user_id, |user| &user.id)
1225 .ok()
1226 .map(|ix| users[ix].clone())
1227 })
1228 .collect();
1229
1230 participants.sort_by_key(|u| u.id);
1231
1232 this.channel_participants
1233 .insert(ChannelId(entry.channel_id), participants);
1234 }
1235
1236 cx.notify();
1237 })
1238 }))
1239 }
1240}
1241
1242impl ChannelState {
1243 fn set_role(&mut self, role: ChannelRole) {
1244 self.role = Some(role);
1245 }
1246
1247 fn has_channel_buffer_changed(&self) -> bool {
1248 self.latest_notes_version.epoch > self.observed_notes_version.epoch
1249 || (self.latest_notes_version.epoch == self.observed_notes_version.epoch
1250 && self
1251 .latest_notes_version
1252 .version
1253 .changed_since(&self.observed_notes_version.version))
1254 }
1255
1256 fn has_new_messages(&self) -> bool {
1257 let latest_message_id = self.latest_chat_message;
1258 let observed_message_id = self.observed_chat_message;
1259
1260 latest_message_id.is_some_and(|latest_message_id| {
1261 latest_message_id > observed_message_id.unwrap_or_default()
1262 })
1263 }
1264
1265 fn last_acknowledged_message_id(&self) -> Option<u64> {
1266 self.observed_chat_message
1267 }
1268
1269 fn acknowledge_message_id(&mut self, message_id: u64) {
1270 let observed = self.observed_chat_message.get_or_insert(message_id);
1271 *observed = (*observed).max(message_id);
1272 }
1273
1274 fn update_latest_message_id(&mut self, message_id: u64) {
1275 self.latest_chat_message =
1276 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1277 }
1278
1279 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1280 if self.observed_notes_version.epoch == epoch {
1281 self.observed_notes_version.version.join(version);
1282 } else {
1283 self.observed_notes_version = NotesVersion {
1284 epoch,
1285 version: version.clone(),
1286 };
1287 }
1288 }
1289
1290 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1291 if self.latest_notes_version.epoch == epoch {
1292 self.latest_notes_version.version.join(version);
1293 } else {
1294 self.latest_notes_version = NotesVersion {
1295 epoch,
1296 version: version.clone(),
1297 };
1298 }
1299 }
1300}