1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use release_channel::RELEASE_CHANNEL;
15use rpc::{
16 proto::{self, ChannelRole, ChannelVisibility},
17 TypedEnvelope,
18};
19use std::{mem, sync::Arc, time::Duration};
20use util::{async_maybe, maybe, ResultExt};
21
22pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
23 let channel_store =
24 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
25 cx.set_global(GlobalChannelStore(channel_store));
26}
27
28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
29
30pub type ChannelId = u64;
31
32#[derive(Debug, Clone, Default)]
33struct NotesVersion {
34 epoch: u64,
35 version: clock::Global,
36}
37
38pub struct ChannelStore {
39 pub channel_index: ChannelIndex,
40 channel_invitations: Vec<Arc<Channel>>,
41 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
42 channel_states: HashMap<ChannelId, ChannelState>,
43
44 outgoing_invites: HashSet<(ChannelId, UserId)>,
45 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
46 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
47 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
48 client: Arc<Client>,
49 user_store: Model<UserStore>,
50 _rpc_subscriptions: [Subscription; 2],
51 _watch_connection_status: Task<Option<()>>,
52 disconnect_channel_buffers_task: Option<Task<()>>,
53 _update_channels: Task<()>,
54}
55
56#[derive(Clone, Debug)]
57pub struct Channel {
58 pub id: ChannelId,
59 pub name: SharedString,
60 pub visibility: proto::ChannelVisibility,
61 pub parent_path: Vec<u64>,
62}
63
64#[derive(Default)]
65pub struct ChannelState {
66 latest_chat_message: Option<u64>,
67 latest_notes_versions: Option<NotesVersion>,
68 observed_chat_message: Option<u64>,
69 observed_notes_versions: Option<NotesVersion>,
70 role: Option<ChannelRole>,
71}
72
73impl Channel {
74 pub fn link(&self) -> String {
75 RELEASE_CHANNEL.link_prefix().to_owned()
76 + "channel/"
77 + &self.slug()
78 + "-"
79 + &self.id.to_string()
80 }
81
82 pub fn is_root_channel(&self) -> bool {
83 self.parent_path.is_empty()
84 }
85
86 pub fn root_id(&self) -> ChannelId {
87 self.parent_path
88 .first()
89 .map(|id| *id as ChannelId)
90 .unwrap_or(self.id)
91 }
92
93 pub fn slug(&self) -> String {
94 let slug: String = self
95 .name
96 .chars()
97 .map(|c| if c.is_alphanumeric() { c } else { '-' })
98 .collect();
99
100 slug.trim_matches(|c| c == '-').to_string()
101 }
102}
103
104pub struct ChannelMembership {
105 pub user: Arc<User>,
106 pub kind: proto::channel_member::Kind,
107 pub role: proto::ChannelRole,
108}
109impl ChannelMembership {
110 pub fn sort_key(&self) -> MembershipSortKey {
111 MembershipSortKey {
112 role_order: match self.role {
113 proto::ChannelRole::Admin => 0,
114 proto::ChannelRole::Member => 1,
115 proto::ChannelRole::Banned => 2,
116 proto::ChannelRole::Guest => 3,
117 },
118 kind_order: match self.kind {
119 proto::channel_member::Kind::Member => 0,
120 proto::channel_member::Kind::Invitee => 1,
121 },
122 username_order: self.user.github_login.as_str(),
123 }
124 }
125}
126
127#[derive(PartialOrd, Ord, PartialEq, Eq)]
128pub struct MembershipSortKey<'a> {
129 role_order: u8,
130 kind_order: u8,
131 username_order: &'a str,
132}
133
134pub enum ChannelEvent {
135 ChannelCreated(ChannelId),
136 ChannelRenamed(ChannelId),
137}
138
139impl EventEmitter<ChannelEvent> for ChannelStore {}
140
141enum OpenedModelHandle<E> {
142 Open(WeakModel<E>),
143 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
144}
145
146struct GlobalChannelStore(Model<ChannelStore>);
147
148impl Global for GlobalChannelStore {}
149
150impl ChannelStore {
151 pub fn global(cx: &AppContext) -> Model<Self> {
152 cx.global::<GlobalChannelStore>().0.clone()
153 }
154
155 pub fn new(
156 client: Arc<Client>,
157 user_store: Model<UserStore>,
158 cx: &mut ModelContext<Self>,
159 ) -> Self {
160 let rpc_subscriptions = [
161 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
162 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
163 ];
164
165 let mut connection_status = client.status();
166 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
167 let watch_connection_status = cx.spawn(|this, mut cx| async move {
168 while let Some(status) = connection_status.next().await {
169 let this = this.upgrade()?;
170 match status {
171 client::Status::Connected { .. } => {
172 this.update(&mut cx, |this, cx| this.handle_connect(cx))
173 .ok()?
174 .await
175 .log_err()?;
176 }
177 client::Status::SignedOut | client::Status::UpgradeRequired => {
178 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
179 .ok();
180 }
181 _ => {
182 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
183 .ok();
184 }
185 }
186 }
187 Some(())
188 });
189
190 Self {
191 channel_invitations: Vec::default(),
192 channel_index: ChannelIndex::default(),
193 channel_participants: Default::default(),
194 outgoing_invites: Default::default(),
195 opened_buffers: Default::default(),
196 opened_chats: Default::default(),
197 update_channels_tx,
198 client,
199 user_store,
200 _rpc_subscriptions: rpc_subscriptions,
201 _watch_connection_status: watch_connection_status,
202 disconnect_channel_buffers_task: None,
203 _update_channels: cx.spawn(|this, mut cx| async move {
204 async_maybe!({
205 while let Some(update_channels) = update_channels_rx.next().await {
206 if let Some(this) = this.upgrade() {
207 let update_task = this.update(&mut cx, |this, cx| {
208 this.update_channels(update_channels, cx)
209 })?;
210 if let Some(update_task) = update_task {
211 update_task.await.log_err();
212 }
213 }
214 }
215 anyhow::Ok(())
216 })
217 .await
218 .log_err();
219 }),
220 channel_states: Default::default(),
221 }
222 }
223
224 pub fn client(&self) -> Arc<Client> {
225 self.client.clone()
226 }
227
228 /// Returns the number of unique channels in the store
229 pub fn channel_count(&self) -> usize {
230 self.channel_index.by_id().len()
231 }
232
233 /// Returns the index of a channel ID in the list of unique channels
234 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
235 self.channel_index
236 .by_id()
237 .keys()
238 .position(|id| *id == channel_id)
239 }
240
241 /// Returns an iterator over all unique channels
242 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
243 self.channel_index.by_id().values()
244 }
245
246 /// Iterate over all entries in the channel DAG
247 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
248 self.channel_index
249 .ordered_channels()
250 .iter()
251 .filter_map(move |id| {
252 let channel = self.channel_index.by_id().get(id)?;
253 Some((channel.parent_path.len(), channel))
254 })
255 }
256
257 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
258 let channel_id = self.channel_index.ordered_channels().get(ix)?;
259 self.channel_index.by_id().get(channel_id)
260 }
261
262 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
263 self.channel_index.by_id().values().nth(ix)
264 }
265
266 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
267 self.channel_invitations
268 .iter()
269 .any(|channel| channel.id == channel_id)
270 }
271
272 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
273 &self.channel_invitations
274 }
275
276 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
277 self.channel_index.by_id().get(&channel_id)
278 }
279
280 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
281 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
282 if let OpenedModelHandle::Open(buffer) = buffer {
283 return buffer.upgrade().is_some();
284 }
285 }
286 false
287 }
288
289 pub fn open_channel_buffer(
290 &mut self,
291 channel_id: ChannelId,
292 cx: &mut ModelContext<Self>,
293 ) -> Task<Result<Model<ChannelBuffer>>> {
294 let client = self.client.clone();
295 let user_store = self.user_store.clone();
296 let channel_store = cx.handle();
297 self.open_channel_resource(
298 channel_id,
299 |this| &mut this.opened_buffers,
300 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
301 cx,
302 )
303 }
304
305 pub fn fetch_channel_messages(
306 &self,
307 message_ids: Vec<u64>,
308 cx: &mut ModelContext<Self>,
309 ) -> Task<Result<Vec<ChannelMessage>>> {
310 let request = if message_ids.is_empty() {
311 None
312 } else {
313 Some(
314 self.client
315 .request(proto::GetChannelMessagesById { message_ids }),
316 )
317 };
318 cx.spawn(|this, mut cx| async move {
319 if let Some(request) = request {
320 let response = request.await?;
321 let this = this
322 .upgrade()
323 .ok_or_else(|| anyhow!("channel store dropped"))?;
324 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
325 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
326 } else {
327 Ok(Vec::new())
328 }
329 })
330 }
331
332 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
333 self.channel_states
334 .get(&channel_id)
335 .is_some_and(|state| state.has_channel_buffer_changed())
336 }
337
338 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
339 self.channel_states
340 .get(&channel_id)
341 .is_some_and(|state| state.has_new_messages())
342 }
343
344 pub fn acknowledge_message_id(
345 &mut self,
346 channel_id: ChannelId,
347 message_id: u64,
348 cx: &mut ModelContext<Self>,
349 ) {
350 self.channel_states
351 .entry(channel_id)
352 .or_insert_with(|| Default::default())
353 .acknowledge_message_id(message_id);
354 cx.notify();
355 }
356
357 pub fn update_latest_message_id(
358 &mut self,
359 channel_id: ChannelId,
360 message_id: u64,
361 cx: &mut ModelContext<Self>,
362 ) {
363 self.channel_states
364 .entry(channel_id)
365 .or_insert_with(|| Default::default())
366 .update_latest_message_id(message_id);
367 cx.notify();
368 }
369
370 pub fn acknowledge_notes_version(
371 &mut self,
372 channel_id: ChannelId,
373 epoch: u64,
374 version: &clock::Global,
375 cx: &mut ModelContext<Self>,
376 ) {
377 self.channel_states
378 .entry(channel_id)
379 .or_insert_with(|| Default::default())
380 .acknowledge_notes_version(epoch, version);
381 cx.notify()
382 }
383
384 pub fn update_latest_notes_version(
385 &mut self,
386 channel_id: ChannelId,
387 epoch: u64,
388 version: &clock::Global,
389 cx: &mut ModelContext<Self>,
390 ) {
391 self.channel_states
392 .entry(channel_id)
393 .or_insert_with(|| Default::default())
394 .update_latest_notes_version(epoch, version);
395 cx.notify()
396 }
397
398 pub fn open_channel_chat(
399 &mut self,
400 channel_id: ChannelId,
401 cx: &mut ModelContext<Self>,
402 ) -> Task<Result<Model<ChannelChat>>> {
403 let client = self.client.clone();
404 let user_store = self.user_store.clone();
405 let this = cx.handle();
406 self.open_channel_resource(
407 channel_id,
408 |this| &mut this.opened_chats,
409 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
410 cx,
411 )
412 }
413
414 /// Asynchronously open a given resource associated with a channel.
415 ///
416 /// Make sure that the resource is only opened once, even if this method
417 /// is called multiple times with the same channel id while the first task
418 /// is still running.
419 fn open_channel_resource<T, F, Fut>(
420 &mut self,
421 channel_id: ChannelId,
422 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
423 load: F,
424 cx: &mut ModelContext<Self>,
425 ) -> Task<Result<Model<T>>>
426 where
427 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
428 Fut: Future<Output = Result<Model<T>>>,
429 T: 'static,
430 {
431 let task = loop {
432 match get_map(self).entry(channel_id) {
433 hash_map::Entry::Occupied(e) => match e.get() {
434 OpenedModelHandle::Open(model) => {
435 if let Some(model) = model.upgrade() {
436 break Task::ready(Ok(model)).shared();
437 } else {
438 get_map(self).remove(&channel_id);
439 continue;
440 }
441 }
442 OpenedModelHandle::Loading(task) => {
443 break task.clone();
444 }
445 },
446 hash_map::Entry::Vacant(e) => {
447 let task = cx
448 .spawn(move |this, mut cx| async move {
449 let channel = this.update(&mut cx, |this, _| {
450 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
451 Arc::new(anyhow!("no channel for id: {}", channel_id))
452 })
453 })??;
454
455 load(channel, cx).await.map_err(Arc::new)
456 })
457 .shared();
458
459 e.insert(OpenedModelHandle::Loading(task.clone()));
460 cx.spawn({
461 let task = task.clone();
462 move |this, mut cx| async move {
463 let result = task.await;
464 this.update(&mut cx, |this, _| match result {
465 Ok(model) => {
466 get_map(this).insert(
467 channel_id,
468 OpenedModelHandle::Open(model.downgrade()),
469 );
470 }
471 Err(_) => {
472 get_map(this).remove(&channel_id);
473 }
474 })
475 .ok();
476 }
477 })
478 .detach();
479 break task;
480 }
481 }
482 };
483 cx.background_executor()
484 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
485 }
486
487 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
488 self.channel_role(channel_id) == proto::ChannelRole::Admin
489 }
490
491 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
492 self.channel_index
493 .by_id()
494 .get(&channel_id)
495 .map_or(false, |channel| channel.is_root_channel())
496 }
497
498 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
499 self.channel_index
500 .by_id()
501 .get(&channel_id)
502 .map_or(false, |channel| {
503 channel.visibility == ChannelVisibility::Public
504 })
505 }
506
507 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
508 match self.channel_role(channel_id) {
509 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
510 _ => Capability::ReadOnly,
511 }
512 }
513
514 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
515 maybe!({
516 let mut channel = self.channel_for_id(channel_id)?;
517 if !channel.is_root_channel() {
518 channel = self.channel_for_id(channel.root_id())?;
519 }
520 let root_channel_state = self.channel_states.get(&channel.id);
521 root_channel_state?.role
522 })
523 .unwrap_or(proto::ChannelRole::Guest)
524 }
525
526 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
527 self.channel_participants
528 .get(&channel_id)
529 .map_or(&[], |v| v.as_slice())
530 }
531
532 pub fn create_channel(
533 &self,
534 name: &str,
535 parent_id: Option<ChannelId>,
536 cx: &mut ModelContext<Self>,
537 ) -> Task<Result<ChannelId>> {
538 let client = self.client.clone();
539 let name = name.trim_start_matches("#").to_owned();
540 cx.spawn(move |this, mut cx| async move {
541 let response = client
542 .request(proto::CreateChannel { name, parent_id })
543 .await?;
544
545 let channel = response
546 .channel
547 .ok_or_else(|| anyhow!("missing channel in response"))?;
548 let channel_id = channel.id;
549
550 this.update(&mut cx, |this, cx| {
551 let task = this.update_channels(
552 proto::UpdateChannels {
553 channels: vec![channel],
554 ..Default::default()
555 },
556 cx,
557 );
558 assert!(task.is_none());
559
560 // This event is emitted because the collab panel wants to clear the pending edit state
561 // before this frame is rendered. But we can't guarantee that the collab panel's future
562 // will resolve before this flush_effects finishes. Synchronously emitting this event
563 // ensures that the collab panel will observe this creation before the frame completes
564 cx.emit(ChannelEvent::ChannelCreated(channel_id));
565 })?;
566
567 Ok(channel_id)
568 })
569 }
570
571 pub fn move_channel(
572 &mut self,
573 channel_id: ChannelId,
574 to: ChannelId,
575 cx: &mut ModelContext<Self>,
576 ) -> Task<Result<()>> {
577 let client = self.client.clone();
578 cx.spawn(move |_, _| async move {
579 let _ = client
580 .request(proto::MoveChannel { channel_id, to })
581 .await?;
582
583 Ok(())
584 })
585 }
586
587 pub fn set_channel_visibility(
588 &mut self,
589 channel_id: ChannelId,
590 visibility: ChannelVisibility,
591 cx: &mut ModelContext<Self>,
592 ) -> Task<Result<()>> {
593 let client = self.client.clone();
594 cx.spawn(move |_, _| async move {
595 let _ = client
596 .request(proto::SetChannelVisibility {
597 channel_id,
598 visibility: visibility.into(),
599 })
600 .await?;
601
602 Ok(())
603 })
604 }
605
606 pub fn invite_member(
607 &mut self,
608 channel_id: ChannelId,
609 user_id: UserId,
610 role: proto::ChannelRole,
611 cx: &mut ModelContext<Self>,
612 ) -> Task<Result<()>> {
613 if !self.outgoing_invites.insert((channel_id, user_id)) {
614 return Task::ready(Err(anyhow!("invite request already in progress")));
615 }
616
617 cx.notify();
618 let client = self.client.clone();
619 cx.spawn(move |this, mut cx| async move {
620 let result = client
621 .request(proto::InviteChannelMember {
622 channel_id,
623 user_id,
624 role: role.into(),
625 })
626 .await;
627
628 this.update(&mut cx, |this, cx| {
629 this.outgoing_invites.remove(&(channel_id, user_id));
630 cx.notify();
631 })?;
632
633 result?;
634
635 Ok(())
636 })
637 }
638
639 pub fn remove_member(
640 &mut self,
641 channel_id: ChannelId,
642 user_id: u64,
643 cx: &mut ModelContext<Self>,
644 ) -> Task<Result<()>> {
645 if !self.outgoing_invites.insert((channel_id, user_id)) {
646 return Task::ready(Err(anyhow!("invite request already in progress")));
647 }
648
649 cx.notify();
650 let client = self.client.clone();
651 cx.spawn(move |this, mut cx| async move {
652 let result = client
653 .request(proto::RemoveChannelMember {
654 channel_id,
655 user_id,
656 })
657 .await;
658
659 this.update(&mut cx, |this, cx| {
660 this.outgoing_invites.remove(&(channel_id, user_id));
661 cx.notify();
662 })?;
663 result?;
664 Ok(())
665 })
666 }
667
668 pub fn set_member_role(
669 &mut self,
670 channel_id: ChannelId,
671 user_id: UserId,
672 role: proto::ChannelRole,
673 cx: &mut ModelContext<Self>,
674 ) -> Task<Result<()>> {
675 if !self.outgoing_invites.insert((channel_id, user_id)) {
676 return Task::ready(Err(anyhow!("member request already in progress")));
677 }
678
679 cx.notify();
680 let client = self.client.clone();
681 cx.spawn(move |this, mut cx| async move {
682 let result = client
683 .request(proto::SetChannelMemberRole {
684 channel_id,
685 user_id,
686 role: role.into(),
687 })
688 .await;
689
690 this.update(&mut cx, |this, cx| {
691 this.outgoing_invites.remove(&(channel_id, user_id));
692 cx.notify();
693 })?;
694
695 result?;
696 Ok(())
697 })
698 }
699
700 pub fn rename(
701 &mut self,
702 channel_id: ChannelId,
703 new_name: &str,
704 cx: &mut ModelContext<Self>,
705 ) -> Task<Result<()>> {
706 let client = self.client.clone();
707 let name = new_name.to_string();
708 cx.spawn(move |this, mut cx| async move {
709 let channel = client
710 .request(proto::RenameChannel { channel_id, name })
711 .await?
712 .channel
713 .ok_or_else(|| anyhow!("missing channel in response"))?;
714 this.update(&mut cx, |this, cx| {
715 let task = this.update_channels(
716 proto::UpdateChannels {
717 channels: vec![channel],
718 ..Default::default()
719 },
720 cx,
721 );
722 assert!(task.is_none());
723
724 // This event is emitted because the collab panel wants to clear the pending edit state
725 // before this frame is rendered. But we can't guarantee that the collab panel's future
726 // will resolve before this flush_effects finishes. Synchronously emitting this event
727 // ensures that the collab panel will observe this creation before the frame complete
728 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
729 })?;
730 Ok(())
731 })
732 }
733
734 pub fn respond_to_channel_invite(
735 &mut self,
736 channel_id: ChannelId,
737 accept: bool,
738 cx: &mut ModelContext<Self>,
739 ) -> Task<Result<()>> {
740 let client = self.client.clone();
741 cx.background_executor().spawn(async move {
742 client
743 .request(proto::RespondToChannelInvite { channel_id, accept })
744 .await?;
745 Ok(())
746 })
747 }
748
749 pub fn get_channel_member_details(
750 &self,
751 channel_id: ChannelId,
752 cx: &mut ModelContext<Self>,
753 ) -> Task<Result<Vec<ChannelMembership>>> {
754 let client = self.client.clone();
755 let user_store = self.user_store.downgrade();
756 cx.spawn(move |_, mut cx| async move {
757 let response = client
758 .request(proto::GetChannelMembers { channel_id })
759 .await?;
760
761 let user_ids = response.members.iter().map(|m| m.user_id).collect();
762 let user_store = user_store
763 .upgrade()
764 .ok_or_else(|| anyhow!("user store dropped"))?;
765 let users = user_store
766 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
767 .await?;
768
769 Ok(users
770 .into_iter()
771 .zip(response.members)
772 .filter_map(|(user, member)| {
773 Some(ChannelMembership {
774 user,
775 role: member.role(),
776 kind: member.kind(),
777 })
778 })
779 .collect())
780 })
781 }
782
783 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
784 let client = self.client.clone();
785 async move {
786 client.request(proto::DeleteChannel { channel_id }).await?;
787 Ok(())
788 }
789 }
790
791 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
792 false
793 }
794
795 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
796 self.outgoing_invites.contains(&(channel_id, user_id))
797 }
798
799 async fn handle_update_channels(
800 this: Model<Self>,
801 message: TypedEnvelope<proto::UpdateChannels>,
802 _: Arc<Client>,
803 mut cx: AsyncAppContext,
804 ) -> Result<()> {
805 this.update(&mut cx, |this, _| {
806 this.update_channels_tx
807 .unbounded_send(message.payload)
808 .unwrap();
809 })?;
810 Ok(())
811 }
812
813 async fn handle_update_user_channels(
814 this: Model<Self>,
815 message: TypedEnvelope<proto::UpdateUserChannels>,
816 _: Arc<Client>,
817 mut cx: AsyncAppContext,
818 ) -> Result<()> {
819 this.update(&mut cx, |this, cx| {
820 for buffer_version in message.payload.observed_channel_buffer_version {
821 let version = language::proto::deserialize_version(&buffer_version.version);
822 this.acknowledge_notes_version(
823 buffer_version.channel_id,
824 buffer_version.epoch,
825 &version,
826 cx,
827 );
828 }
829 for message_id in message.payload.observed_channel_message_id {
830 this.acknowledge_message_id(message_id.channel_id, message_id.message_id, cx);
831 }
832 for membership in message.payload.channel_memberships {
833 if let Some(role) = ChannelRole::from_i32(membership.role) {
834 this.channel_states
835 .entry(membership.channel_id)
836 .or_insert_with(|| ChannelState::default())
837 .set_role(role)
838 }
839 }
840 })
841 }
842
843 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
844 self.channel_index.clear();
845 self.channel_invitations.clear();
846 self.channel_participants.clear();
847 self.channel_index.clear();
848 self.outgoing_invites.clear();
849 self.disconnect_channel_buffers_task.take();
850
851 for chat in self.opened_chats.values() {
852 if let OpenedModelHandle::Open(chat) = chat {
853 if let Some(chat) = chat.upgrade() {
854 chat.update(cx, |chat, cx| {
855 chat.rejoin(cx);
856 });
857 }
858 }
859 }
860
861 let mut buffer_versions = Vec::new();
862 for buffer in self.opened_buffers.values() {
863 if let OpenedModelHandle::Open(buffer) = buffer {
864 if let Some(buffer) = buffer.upgrade() {
865 let channel_buffer = buffer.read(cx);
866 let buffer = channel_buffer.buffer().read(cx);
867 buffer_versions.push(proto::ChannelBufferVersion {
868 channel_id: channel_buffer.channel_id,
869 epoch: channel_buffer.epoch(),
870 version: language::proto::serialize_version(&buffer.version()),
871 });
872 }
873 }
874 }
875
876 if buffer_versions.is_empty() {
877 return Task::ready(Ok(()));
878 }
879
880 let response = self.client.request(proto::RejoinChannelBuffers {
881 buffers: buffer_versions,
882 });
883
884 cx.spawn(|this, mut cx| async move {
885 let mut response = response.await?;
886
887 this.update(&mut cx, |this, cx| {
888 this.opened_buffers.retain(|_, buffer| match buffer {
889 OpenedModelHandle::Open(channel_buffer) => {
890 let Some(channel_buffer) = channel_buffer.upgrade() else {
891 return false;
892 };
893
894 channel_buffer.update(cx, |channel_buffer, cx| {
895 let channel_id = channel_buffer.channel_id;
896 if let Some(remote_buffer) = response
897 .buffers
898 .iter_mut()
899 .find(|buffer| buffer.channel_id == channel_id)
900 {
901 let channel_id = channel_buffer.channel_id;
902 let remote_version =
903 language::proto::deserialize_version(&remote_buffer.version);
904
905 channel_buffer.replace_collaborators(
906 mem::take(&mut remote_buffer.collaborators),
907 cx,
908 );
909
910 let operations = channel_buffer
911 .buffer()
912 .update(cx, |buffer, cx| {
913 let outgoing_operations =
914 buffer.serialize_ops(Some(remote_version), cx);
915 let incoming_operations =
916 mem::take(&mut remote_buffer.operations)
917 .into_iter()
918 .map(language::proto::deserialize_operation)
919 .collect::<Result<Vec<_>>>()?;
920 buffer.apply_ops(incoming_operations, cx)?;
921 anyhow::Ok(outgoing_operations)
922 })
923 .log_err();
924
925 if let Some(operations) = operations {
926 let client = this.client.clone();
927 cx.background_executor()
928 .spawn(async move {
929 let operations = operations.await;
930 for chunk in
931 language::proto::split_operations(operations)
932 {
933 client
934 .send(proto::UpdateChannelBuffer {
935 channel_id,
936 operations: chunk,
937 })
938 .ok();
939 }
940 })
941 .detach();
942 return true;
943 }
944 }
945
946 channel_buffer.disconnect(cx);
947 false
948 })
949 }
950 OpenedModelHandle::Loading(_) => true,
951 });
952 })
953 .ok();
954 anyhow::Ok(())
955 })
956 }
957
958 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
959 cx.notify();
960
961 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
962 cx.spawn(move |this, mut cx| async move {
963 if wait_for_reconnect {
964 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
965 }
966
967 if let Some(this) = this.upgrade() {
968 this.update(&mut cx, |this, cx| {
969 for (_, buffer) in this.opened_buffers.drain() {
970 if let OpenedModelHandle::Open(buffer) = buffer {
971 if let Some(buffer) = buffer.upgrade() {
972 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
973 }
974 }
975 }
976 })
977 .ok();
978 }
979 })
980 });
981 }
982
983 pub(crate) fn update_channels(
984 &mut self,
985 payload: proto::UpdateChannels,
986 cx: &mut ModelContext<ChannelStore>,
987 ) -> Option<Task<Result<()>>> {
988 if !payload.remove_channel_invitations.is_empty() {
989 self.channel_invitations
990 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
991 }
992 for channel in payload.channel_invitations {
993 match self
994 .channel_invitations
995 .binary_search_by_key(&channel.id, |c| c.id)
996 {
997 Ok(ix) => {
998 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
999 }
1000 Err(ix) => self.channel_invitations.insert(
1001 ix,
1002 Arc::new(Channel {
1003 id: channel.id,
1004 visibility: channel.visibility(),
1005 name: channel.name.into(),
1006 parent_path: channel.parent_path,
1007 }),
1008 ),
1009 }
1010 }
1011
1012 let channels_changed = !payload.channels.is_empty()
1013 || !payload.delete_channels.is_empty()
1014 || !payload.latest_channel_message_ids.is_empty()
1015 || !payload.latest_channel_buffer_versions.is_empty();
1016
1017 if channels_changed {
1018 if !payload.delete_channels.is_empty() {
1019 self.channel_index.delete_channels(&payload.delete_channels);
1020 self.channel_participants
1021 .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
1022
1023 for channel_id in &payload.delete_channels {
1024 let channel_id = *channel_id;
1025 if payload
1026 .channels
1027 .iter()
1028 .any(|channel| channel.id == channel_id)
1029 {
1030 continue;
1031 }
1032 if let Some(OpenedModelHandle::Open(buffer)) =
1033 self.opened_buffers.remove(&channel_id)
1034 {
1035 if let Some(buffer) = buffer.upgrade() {
1036 buffer.update(cx, ChannelBuffer::disconnect);
1037 }
1038 }
1039 }
1040 }
1041
1042 let mut index = self.channel_index.bulk_insert();
1043 for channel in payload.channels {
1044 let id = channel.id;
1045 let channel_changed = index.insert(channel);
1046
1047 if channel_changed {
1048 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1049 if let Some(buffer) = buffer.upgrade() {
1050 buffer.update(cx, ChannelBuffer::channel_changed);
1051 }
1052 }
1053 }
1054 }
1055
1056 for latest_buffer_version in payload.latest_channel_buffer_versions {
1057 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1058 self.channel_states
1059 .entry(latest_buffer_version.channel_id)
1060 .or_default()
1061 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1062 }
1063
1064 for latest_channel_message in payload.latest_channel_message_ids {
1065 self.channel_states
1066 .entry(latest_channel_message.channel_id)
1067 .or_default()
1068 .update_latest_message_id(latest_channel_message.message_id);
1069 }
1070 }
1071
1072 cx.notify();
1073 if payload.channel_participants.is_empty() {
1074 return None;
1075 }
1076
1077 let mut all_user_ids = Vec::new();
1078 let channel_participants = payload.channel_participants;
1079 for entry in &channel_participants {
1080 for user_id in entry.participant_user_ids.iter() {
1081 if let Err(ix) = all_user_ids.binary_search(user_id) {
1082 all_user_ids.insert(ix, *user_id);
1083 }
1084 }
1085 }
1086
1087 let users = self
1088 .user_store
1089 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1090 Some(cx.spawn(|this, mut cx| async move {
1091 let users = users.await?;
1092
1093 this.update(&mut cx, |this, cx| {
1094 for entry in &channel_participants {
1095 let mut participants: Vec<_> = entry
1096 .participant_user_ids
1097 .iter()
1098 .filter_map(|user_id| {
1099 users
1100 .binary_search_by_key(&user_id, |user| &user.id)
1101 .ok()
1102 .map(|ix| users[ix].clone())
1103 })
1104 .collect();
1105
1106 participants.sort_by_key(|u| u.id);
1107
1108 this.channel_participants
1109 .insert(entry.channel_id, participants);
1110 }
1111
1112 cx.notify();
1113 })
1114 }))
1115 }
1116}
1117
1118impl ChannelState {
1119 fn set_role(&mut self, role: ChannelRole) {
1120 self.role = Some(role);
1121 }
1122
1123 fn has_channel_buffer_changed(&self) -> bool {
1124 if let Some(latest_version) = &self.latest_notes_versions {
1125 if let Some(observed_version) = &self.observed_notes_versions {
1126 latest_version.epoch > observed_version.epoch
1127 || latest_version
1128 .version
1129 .changed_since(&observed_version.version)
1130 } else {
1131 true
1132 }
1133 } else {
1134 false
1135 }
1136 }
1137
1138 fn has_new_messages(&self) -> bool {
1139 let latest_message_id = self.latest_chat_message;
1140 let observed_message_id = self.observed_chat_message;
1141
1142 latest_message_id.is_some_and(|latest_message_id| {
1143 latest_message_id > observed_message_id.unwrap_or_default()
1144 })
1145 }
1146
1147 fn acknowledge_message_id(&mut self, message_id: u64) {
1148 let observed = self.observed_chat_message.get_or_insert(message_id);
1149 *observed = (*observed).max(message_id);
1150 }
1151
1152 fn update_latest_message_id(&mut self, message_id: u64) {
1153 self.latest_chat_message =
1154 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1155 }
1156
1157 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1158 if let Some(existing) = &mut self.observed_notes_versions {
1159 if existing.epoch == epoch {
1160 existing.version.join(version);
1161 return;
1162 }
1163 }
1164 self.observed_notes_versions = Some(NotesVersion {
1165 epoch,
1166 version: version.clone(),
1167 });
1168 }
1169
1170 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1171 if let Some(existing) = &mut self.latest_notes_versions {
1172 if existing.epoch == epoch {
1173 existing.version.join(version);
1174 return;
1175 }
1176 }
1177 self.latest_notes_versions = Some(NotesVersion {
1178 epoch,
1179 version: version.clone(),
1180 });
1181 }
1182}