1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use db::RELEASE_CHANNEL;
9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
10use gpui::{
11 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SharedString, Task,
12 WeakModel,
13};
14use language::Capability;
15use rpc::{
16 proto::{self, ChannelRole, ChannelVisibility},
17 TypedEnvelope,
18};
19use std::{mem, sync::Arc, time::Duration};
20use util::{async_maybe, maybe, ResultExt};
21
22pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
23 let channel_store =
24 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
25 cx.set_global(channel_store);
26}
27
28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
29
30pub type ChannelId = u64;
31
32#[derive(Debug, Clone, Default)]
33struct NotesVersion {
34 epoch: u64,
35 version: clock::Global,
36}
37
38pub struct ChannelStore {
39 pub channel_index: ChannelIndex,
40 channel_invitations: Vec<Arc<Channel>>,
41 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
42 channel_states: HashMap<ChannelId, ChannelState>,
43
44 outgoing_invites: HashSet<(ChannelId, UserId)>,
45 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
46 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
47 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
48 client: Arc<Client>,
49 user_store: Model<UserStore>,
50 _rpc_subscriptions: [Subscription; 2],
51 _watch_connection_status: Task<Option<()>>,
52 disconnect_channel_buffers_task: Option<Task<()>>,
53 _update_channels: Task<()>,
54}
55
56#[derive(Clone, Debug)]
57pub struct Channel {
58 pub id: ChannelId,
59 pub name: SharedString,
60 pub visibility: proto::ChannelVisibility,
61 pub parent_path: Vec<u64>,
62}
63
64#[derive(Default)]
65pub struct ChannelState {
66 latest_chat_message: Option<u64>,
67 latest_notes_versions: Option<NotesVersion>,
68 observed_chat_message: Option<u64>,
69 observed_notes_versions: Option<NotesVersion>,
70 role: Option<ChannelRole>,
71}
72
73impl Channel {
74 pub fn link(&self) -> String {
75 RELEASE_CHANNEL.link_prefix().to_owned()
76 + "channel/"
77 + &self.slug()
78 + "-"
79 + &self.id.to_string()
80 }
81
82 pub fn is_root_channel(&self) -> bool {
83 self.parent_path.is_empty()
84 }
85
86 pub fn root_id(&self) -> ChannelId {
87 self.parent_path
88 .first()
89 .map(|id| *id as ChannelId)
90 .unwrap_or(self.id)
91 }
92
93 pub fn slug(&self) -> String {
94 let slug: String = self
95 .name
96 .chars()
97 .map(|c| if c.is_alphanumeric() { c } else { '-' })
98 .collect();
99
100 slug.trim_matches(|c| c == '-').to_string()
101 }
102}
103
104pub struct ChannelMembership {
105 pub user: Arc<User>,
106 pub kind: proto::channel_member::Kind,
107 pub role: proto::ChannelRole,
108}
109impl ChannelMembership {
110 pub fn sort_key(&self) -> MembershipSortKey {
111 MembershipSortKey {
112 role_order: match self.role {
113 proto::ChannelRole::Admin => 0,
114 proto::ChannelRole::Member => 1,
115 proto::ChannelRole::Banned => 2,
116 proto::ChannelRole::Guest => 3,
117 },
118 kind_order: match self.kind {
119 proto::channel_member::Kind::Member => 0,
120 proto::channel_member::Kind::Invitee => 1,
121 },
122 username_order: self.user.github_login.as_str(),
123 }
124 }
125}
126
127#[derive(PartialOrd, Ord, PartialEq, Eq)]
128pub struct MembershipSortKey<'a> {
129 role_order: u8,
130 kind_order: u8,
131 username_order: &'a str,
132}
133
134pub enum ChannelEvent {
135 ChannelCreated(ChannelId),
136 ChannelRenamed(ChannelId),
137}
138
139impl EventEmitter<ChannelEvent> for ChannelStore {}
140
141enum OpenedModelHandle<E> {
142 Open(WeakModel<E>),
143 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
144}
145
146impl ChannelStore {
147 pub fn global(cx: &AppContext) -> Model<Self> {
148 cx.global::<Model<Self>>().clone()
149 }
150
151 pub fn new(
152 client: Arc<Client>,
153 user_store: Model<UserStore>,
154 cx: &mut ModelContext<Self>,
155 ) -> Self {
156 let rpc_subscriptions = [
157 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
158 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
159 ];
160
161 let mut connection_status = client.status();
162 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
163 let watch_connection_status = cx.spawn(|this, mut cx| async move {
164 while let Some(status) = connection_status.next().await {
165 let this = this.upgrade()?;
166 match status {
167 client::Status::Connected { .. } => {
168 this.update(&mut cx, |this, cx| this.handle_connect(cx))
169 .ok()?
170 .await
171 .log_err()?;
172 }
173 client::Status::SignedOut | client::Status::UpgradeRequired => {
174 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
175 .ok();
176 }
177 _ => {
178 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
179 .ok();
180 }
181 }
182 }
183 Some(())
184 });
185
186 Self {
187 channel_invitations: Vec::default(),
188 channel_index: ChannelIndex::default(),
189 channel_participants: Default::default(),
190 outgoing_invites: Default::default(),
191 opened_buffers: Default::default(),
192 opened_chats: Default::default(),
193 update_channels_tx,
194 client,
195 user_store,
196 _rpc_subscriptions: rpc_subscriptions,
197 _watch_connection_status: watch_connection_status,
198 disconnect_channel_buffers_task: None,
199 _update_channels: cx.spawn(|this, mut cx| async move {
200 async_maybe!({
201 while let Some(update_channels) = update_channels_rx.next().await {
202 if let Some(this) = this.upgrade() {
203 let update_task = this.update(&mut cx, |this, cx| {
204 this.update_channels(update_channels, cx)
205 })?;
206 if let Some(update_task) = update_task {
207 update_task.await.log_err();
208 }
209 }
210 }
211 anyhow::Ok(())
212 })
213 .await
214 .log_err();
215 }),
216 channel_states: Default::default(),
217 }
218 }
219
220 pub fn client(&self) -> Arc<Client> {
221 self.client.clone()
222 }
223
224 /// Returns the number of unique channels in the store
225 pub fn channel_count(&self) -> usize {
226 self.channel_index.by_id().len()
227 }
228
229 /// Returns the index of a channel ID in the list of unique channels
230 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
231 self.channel_index
232 .by_id()
233 .keys()
234 .position(|id| *id == channel_id)
235 }
236
237 /// Returns an iterator over all unique channels
238 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
239 self.channel_index.by_id().values()
240 }
241
242 /// Iterate over all entries in the channel DAG
243 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
244 self.channel_index
245 .ordered_channels()
246 .iter()
247 .filter_map(move |id| {
248 let channel = self.channel_index.by_id().get(id)?;
249 Some((channel.parent_path.len(), channel))
250 })
251 }
252
253 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
254 let channel_id = self.channel_index.ordered_channels().get(ix)?;
255 self.channel_index.by_id().get(channel_id)
256 }
257
258 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
259 self.channel_index.by_id().values().nth(ix)
260 }
261
262 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
263 self.channel_invitations
264 .iter()
265 .any(|channel| channel.id == channel_id)
266 }
267
268 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
269 &self.channel_invitations
270 }
271
272 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
273 self.channel_index.by_id().get(&channel_id)
274 }
275
276 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
277 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
278 if let OpenedModelHandle::Open(buffer) = buffer {
279 return buffer.upgrade().is_some();
280 }
281 }
282 false
283 }
284
285 pub fn open_channel_buffer(
286 &mut self,
287 channel_id: ChannelId,
288 cx: &mut ModelContext<Self>,
289 ) -> Task<Result<Model<ChannelBuffer>>> {
290 let client = self.client.clone();
291 let user_store = self.user_store.clone();
292 let channel_store = cx.handle();
293 self.open_channel_resource(
294 channel_id,
295 |this| &mut this.opened_buffers,
296 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
297 cx,
298 )
299 }
300
301 pub fn fetch_channel_messages(
302 &self,
303 message_ids: Vec<u64>,
304 cx: &mut ModelContext<Self>,
305 ) -> Task<Result<Vec<ChannelMessage>>> {
306 let request = if message_ids.is_empty() {
307 None
308 } else {
309 Some(
310 self.client
311 .request(proto::GetChannelMessagesById { message_ids }),
312 )
313 };
314 cx.spawn(|this, mut cx| async move {
315 if let Some(request) = request {
316 let response = request.await?;
317 let this = this
318 .upgrade()
319 .ok_or_else(|| anyhow!("channel store dropped"))?;
320 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
321 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
322 } else {
323 Ok(Vec::new())
324 }
325 })
326 }
327
328 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
329 self.channel_states
330 .get(&channel_id)
331 .is_some_and(|state| state.has_channel_buffer_changed())
332 }
333
334 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
335 self.channel_states
336 .get(&channel_id)
337 .is_some_and(|state| state.has_new_messages())
338 }
339
340 pub fn acknowledge_message_id(
341 &mut self,
342 channel_id: ChannelId,
343 message_id: u64,
344 cx: &mut ModelContext<Self>,
345 ) {
346 self.channel_states
347 .entry(channel_id)
348 .or_insert_with(|| Default::default())
349 .acknowledge_message_id(message_id);
350 cx.notify();
351 }
352
353 pub fn update_latest_message_id(
354 &mut self,
355 channel_id: ChannelId,
356 message_id: u64,
357 cx: &mut ModelContext<Self>,
358 ) {
359 self.channel_states
360 .entry(channel_id)
361 .or_insert_with(|| Default::default())
362 .update_latest_message_id(message_id);
363 cx.notify();
364 }
365
366 pub fn acknowledge_notes_version(
367 &mut self,
368 channel_id: ChannelId,
369 epoch: u64,
370 version: &clock::Global,
371 cx: &mut ModelContext<Self>,
372 ) {
373 self.channel_states
374 .entry(channel_id)
375 .or_insert_with(|| Default::default())
376 .acknowledge_notes_version(epoch, version);
377 cx.notify()
378 }
379
380 pub fn update_latest_notes_version(
381 &mut self,
382 channel_id: ChannelId,
383 epoch: u64,
384 version: &clock::Global,
385 cx: &mut ModelContext<Self>,
386 ) {
387 self.channel_states
388 .entry(channel_id)
389 .or_insert_with(|| Default::default())
390 .update_latest_notes_version(epoch, version);
391 cx.notify()
392 }
393
394 pub fn open_channel_chat(
395 &mut self,
396 channel_id: ChannelId,
397 cx: &mut ModelContext<Self>,
398 ) -> Task<Result<Model<ChannelChat>>> {
399 let client = self.client.clone();
400 let user_store = self.user_store.clone();
401 let this = cx.handle();
402 self.open_channel_resource(
403 channel_id,
404 |this| &mut this.opened_chats,
405 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
406 cx,
407 )
408 }
409
410 /// Asynchronously open a given resource associated with a channel.
411 ///
412 /// Make sure that the resource is only opened once, even if this method
413 /// is called multiple times with the same channel id while the first task
414 /// is still running.
415 fn open_channel_resource<T, F, Fut>(
416 &mut self,
417 channel_id: ChannelId,
418 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
419 load: F,
420 cx: &mut ModelContext<Self>,
421 ) -> Task<Result<Model<T>>>
422 where
423 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
424 Fut: Future<Output = Result<Model<T>>>,
425 T: 'static,
426 {
427 let task = loop {
428 match get_map(self).entry(channel_id) {
429 hash_map::Entry::Occupied(e) => match e.get() {
430 OpenedModelHandle::Open(model) => {
431 if let Some(model) = model.upgrade() {
432 break Task::ready(Ok(model)).shared();
433 } else {
434 get_map(self).remove(&channel_id);
435 continue;
436 }
437 }
438 OpenedModelHandle::Loading(task) => {
439 break task.clone();
440 }
441 },
442 hash_map::Entry::Vacant(e) => {
443 let task = cx
444 .spawn(move |this, mut cx| async move {
445 let channel = this.update(&mut cx, |this, _| {
446 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
447 Arc::new(anyhow!("no channel for id: {}", channel_id))
448 })
449 })??;
450
451 load(channel, cx).await.map_err(Arc::new)
452 })
453 .shared();
454
455 e.insert(OpenedModelHandle::Loading(task.clone()));
456 cx.spawn({
457 let task = task.clone();
458 move |this, mut cx| async move {
459 let result = task.await;
460 this.update(&mut cx, |this, _| match result {
461 Ok(model) => {
462 get_map(this).insert(
463 channel_id,
464 OpenedModelHandle::Open(model.downgrade()),
465 );
466 }
467 Err(_) => {
468 get_map(this).remove(&channel_id);
469 }
470 })
471 .ok();
472 }
473 })
474 .detach();
475 break task;
476 }
477 }
478 };
479 cx.background_executor()
480 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
481 }
482
483 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
484 self.channel_role(channel_id) == proto::ChannelRole::Admin
485 }
486
487 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
488 self.channel_index
489 .by_id()
490 .get(&channel_id)
491 .map_or(false, |channel| channel.is_root_channel())
492 }
493
494 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
495 self.channel_index
496 .by_id()
497 .get(&channel_id)
498 .map_or(false, |channel| {
499 channel.visibility == ChannelVisibility::Public
500 })
501 }
502
503 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
504 match self.channel_role(channel_id) {
505 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
506 _ => Capability::ReadOnly,
507 }
508 }
509
510 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
511 maybe!({
512 let mut channel = self.channel_for_id(channel_id)?;
513 if !channel.is_root_channel() {
514 channel = self.channel_for_id(channel.root_id())?;
515 }
516 let root_channel_state = self.channel_states.get(&channel.id);
517 root_channel_state?.role
518 })
519 .unwrap_or(proto::ChannelRole::Guest)
520 }
521
522 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
523 self.channel_participants
524 .get(&channel_id)
525 .map_or(&[], |v| v.as_slice())
526 }
527
528 pub fn create_channel(
529 &self,
530 name: &str,
531 parent_id: Option<ChannelId>,
532 cx: &mut ModelContext<Self>,
533 ) -> Task<Result<ChannelId>> {
534 let client = self.client.clone();
535 let name = name.trim_start_matches("#").to_owned();
536 cx.spawn(move |this, mut cx| async move {
537 let response = client
538 .request(proto::CreateChannel { name, parent_id })
539 .await?;
540
541 let channel = response
542 .channel
543 .ok_or_else(|| anyhow!("missing channel in response"))?;
544 let channel_id = channel.id;
545
546 this.update(&mut cx, |this, cx| {
547 let task = this.update_channels(
548 proto::UpdateChannels {
549 channels: vec![channel],
550 ..Default::default()
551 },
552 cx,
553 );
554 assert!(task.is_none());
555
556 // This event is emitted because the collab panel wants to clear the pending edit state
557 // before this frame is rendered. But we can't guarantee that the collab panel's future
558 // will resolve before this flush_effects finishes. Synchronously emitting this event
559 // ensures that the collab panel will observe this creation before the frame completes
560 cx.emit(ChannelEvent::ChannelCreated(channel_id));
561 })?;
562
563 Ok(channel_id)
564 })
565 }
566
567 pub fn move_channel(
568 &mut self,
569 channel_id: ChannelId,
570 to: ChannelId,
571 cx: &mut ModelContext<Self>,
572 ) -> Task<Result<()>> {
573 let client = self.client.clone();
574 cx.spawn(move |_, _| async move {
575 let _ = client
576 .request(proto::MoveChannel { channel_id, to })
577 .await?;
578
579 Ok(())
580 })
581 }
582
583 pub fn set_channel_visibility(
584 &mut self,
585 channel_id: ChannelId,
586 visibility: ChannelVisibility,
587 cx: &mut ModelContext<Self>,
588 ) -> Task<Result<()>> {
589 let client = self.client.clone();
590 cx.spawn(move |_, _| async move {
591 let _ = client
592 .request(proto::SetChannelVisibility {
593 channel_id,
594 visibility: visibility.into(),
595 })
596 .await?;
597
598 Ok(())
599 })
600 }
601
602 pub fn invite_member(
603 &mut self,
604 channel_id: ChannelId,
605 user_id: UserId,
606 role: proto::ChannelRole,
607 cx: &mut ModelContext<Self>,
608 ) -> Task<Result<()>> {
609 if !self.outgoing_invites.insert((channel_id, user_id)) {
610 return Task::ready(Err(anyhow!("invite request already in progress")));
611 }
612
613 cx.notify();
614 let client = self.client.clone();
615 cx.spawn(move |this, mut cx| async move {
616 let result = client
617 .request(proto::InviteChannelMember {
618 channel_id,
619 user_id,
620 role: role.into(),
621 })
622 .await;
623
624 this.update(&mut cx, |this, cx| {
625 this.outgoing_invites.remove(&(channel_id, user_id));
626 cx.notify();
627 })?;
628
629 result?;
630
631 Ok(())
632 })
633 }
634
635 pub fn remove_member(
636 &mut self,
637 channel_id: ChannelId,
638 user_id: u64,
639 cx: &mut ModelContext<Self>,
640 ) -> Task<Result<()>> {
641 if !self.outgoing_invites.insert((channel_id, user_id)) {
642 return Task::ready(Err(anyhow!("invite request already in progress")));
643 }
644
645 cx.notify();
646 let client = self.client.clone();
647 cx.spawn(move |this, mut cx| async move {
648 let result = client
649 .request(proto::RemoveChannelMember {
650 channel_id,
651 user_id,
652 })
653 .await;
654
655 this.update(&mut cx, |this, cx| {
656 this.outgoing_invites.remove(&(channel_id, user_id));
657 cx.notify();
658 })?;
659 result?;
660 Ok(())
661 })
662 }
663
664 pub fn set_member_role(
665 &mut self,
666 channel_id: ChannelId,
667 user_id: UserId,
668 role: proto::ChannelRole,
669 cx: &mut ModelContext<Self>,
670 ) -> Task<Result<()>> {
671 if !self.outgoing_invites.insert((channel_id, user_id)) {
672 return Task::ready(Err(anyhow!("member request already in progress")));
673 }
674
675 cx.notify();
676 let client = self.client.clone();
677 cx.spawn(move |this, mut cx| async move {
678 let result = client
679 .request(proto::SetChannelMemberRole {
680 channel_id,
681 user_id,
682 role: role.into(),
683 })
684 .await;
685
686 this.update(&mut cx, |this, cx| {
687 this.outgoing_invites.remove(&(channel_id, user_id));
688 cx.notify();
689 })?;
690
691 result?;
692 Ok(())
693 })
694 }
695
696 pub fn rename(
697 &mut self,
698 channel_id: ChannelId,
699 new_name: &str,
700 cx: &mut ModelContext<Self>,
701 ) -> Task<Result<()>> {
702 let client = self.client.clone();
703 let name = new_name.to_string();
704 cx.spawn(move |this, mut cx| async move {
705 let channel = client
706 .request(proto::RenameChannel { channel_id, name })
707 .await?
708 .channel
709 .ok_or_else(|| anyhow!("missing channel in response"))?;
710 this.update(&mut cx, |this, cx| {
711 let task = this.update_channels(
712 proto::UpdateChannels {
713 channels: vec![channel],
714 ..Default::default()
715 },
716 cx,
717 );
718 assert!(task.is_none());
719
720 // This event is emitted because the collab panel wants to clear the pending edit state
721 // before this frame is rendered. But we can't guarantee that the collab panel's future
722 // will resolve before this flush_effects finishes. Synchronously emitting this event
723 // ensures that the collab panel will observe this creation before the frame complete
724 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
725 })?;
726 Ok(())
727 })
728 }
729
730 pub fn respond_to_channel_invite(
731 &mut self,
732 channel_id: ChannelId,
733 accept: bool,
734 cx: &mut ModelContext<Self>,
735 ) -> Task<Result<()>> {
736 let client = self.client.clone();
737 cx.background_executor().spawn(async move {
738 client
739 .request(proto::RespondToChannelInvite { channel_id, accept })
740 .await?;
741 Ok(())
742 })
743 }
744
745 pub fn get_channel_member_details(
746 &self,
747 channel_id: ChannelId,
748 cx: &mut ModelContext<Self>,
749 ) -> Task<Result<Vec<ChannelMembership>>> {
750 let client = self.client.clone();
751 let user_store = self.user_store.downgrade();
752 cx.spawn(move |_, mut cx| async move {
753 let response = client
754 .request(proto::GetChannelMembers { channel_id })
755 .await?;
756
757 let user_ids = response.members.iter().map(|m| m.user_id).collect();
758 let user_store = user_store
759 .upgrade()
760 .ok_or_else(|| anyhow!("user store dropped"))?;
761 let users = user_store
762 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
763 .await?;
764
765 Ok(users
766 .into_iter()
767 .zip(response.members)
768 .filter_map(|(user, member)| {
769 Some(ChannelMembership {
770 user,
771 role: member.role(),
772 kind: member.kind(),
773 })
774 })
775 .collect())
776 })
777 }
778
779 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
780 let client = self.client.clone();
781 async move {
782 client.request(proto::DeleteChannel { channel_id }).await?;
783 Ok(())
784 }
785 }
786
787 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
788 false
789 }
790
791 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
792 self.outgoing_invites.contains(&(channel_id, user_id))
793 }
794
795 async fn handle_update_channels(
796 this: Model<Self>,
797 message: TypedEnvelope<proto::UpdateChannels>,
798 _: Arc<Client>,
799 mut cx: AsyncAppContext,
800 ) -> Result<()> {
801 this.update(&mut cx, |this, _| {
802 this.update_channels_tx
803 .unbounded_send(message.payload)
804 .unwrap();
805 })?;
806 Ok(())
807 }
808
809 async fn handle_update_user_channels(
810 this: Model<Self>,
811 message: TypedEnvelope<proto::UpdateUserChannels>,
812 _: Arc<Client>,
813 mut cx: AsyncAppContext,
814 ) -> Result<()> {
815 this.update(&mut cx, |this, cx| {
816 for buffer_version in message.payload.observed_channel_buffer_version {
817 let version = language::proto::deserialize_version(&buffer_version.version);
818 this.acknowledge_notes_version(
819 buffer_version.channel_id,
820 buffer_version.epoch,
821 &version,
822 cx,
823 );
824 }
825 for message_id in message.payload.observed_channel_message_id {
826 this.acknowledge_message_id(message_id.channel_id, message_id.message_id, cx);
827 }
828 for membership in message.payload.channel_memberships {
829 if let Some(role) = ChannelRole::from_i32(membership.role) {
830 this.channel_states
831 .entry(membership.channel_id)
832 .or_insert_with(|| ChannelState::default())
833 .set_role(role)
834 }
835 }
836 })
837 }
838
839 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
840 self.channel_index.clear();
841 self.channel_invitations.clear();
842 self.channel_participants.clear();
843 self.channel_index.clear();
844 self.outgoing_invites.clear();
845 self.disconnect_channel_buffers_task.take();
846
847 for chat in self.opened_chats.values() {
848 if let OpenedModelHandle::Open(chat) = chat {
849 if let Some(chat) = chat.upgrade() {
850 chat.update(cx, |chat, cx| {
851 chat.rejoin(cx);
852 });
853 }
854 }
855 }
856
857 let mut buffer_versions = Vec::new();
858 for buffer in self.opened_buffers.values() {
859 if let OpenedModelHandle::Open(buffer) = buffer {
860 if let Some(buffer) = buffer.upgrade() {
861 let channel_buffer = buffer.read(cx);
862 let buffer = channel_buffer.buffer().read(cx);
863 buffer_versions.push(proto::ChannelBufferVersion {
864 channel_id: channel_buffer.channel_id,
865 epoch: channel_buffer.epoch(),
866 version: language::proto::serialize_version(&buffer.version()),
867 });
868 }
869 }
870 }
871
872 if buffer_versions.is_empty() {
873 return Task::ready(Ok(()));
874 }
875
876 let response = self.client.request(proto::RejoinChannelBuffers {
877 buffers: buffer_versions,
878 });
879
880 cx.spawn(|this, mut cx| async move {
881 let mut response = response.await?;
882
883 this.update(&mut cx, |this, cx| {
884 this.opened_buffers.retain(|_, buffer| match buffer {
885 OpenedModelHandle::Open(channel_buffer) => {
886 let Some(channel_buffer) = channel_buffer.upgrade() else {
887 return false;
888 };
889
890 channel_buffer.update(cx, |channel_buffer, cx| {
891 let channel_id = channel_buffer.channel_id;
892 if let Some(remote_buffer) = response
893 .buffers
894 .iter_mut()
895 .find(|buffer| buffer.channel_id == channel_id)
896 {
897 let channel_id = channel_buffer.channel_id;
898 let remote_version =
899 language::proto::deserialize_version(&remote_buffer.version);
900
901 channel_buffer.replace_collaborators(
902 mem::take(&mut remote_buffer.collaborators),
903 cx,
904 );
905
906 let operations = channel_buffer
907 .buffer()
908 .update(cx, |buffer, cx| {
909 let outgoing_operations =
910 buffer.serialize_ops(Some(remote_version), cx);
911 let incoming_operations =
912 mem::take(&mut remote_buffer.operations)
913 .into_iter()
914 .map(language::proto::deserialize_operation)
915 .collect::<Result<Vec<_>>>()?;
916 buffer.apply_ops(incoming_operations, cx)?;
917 anyhow::Ok(outgoing_operations)
918 })
919 .log_err();
920
921 if let Some(operations) = operations {
922 let client = this.client.clone();
923 cx.background_executor()
924 .spawn(async move {
925 let operations = operations.await;
926 for chunk in
927 language::proto::split_operations(operations)
928 {
929 client
930 .send(proto::UpdateChannelBuffer {
931 channel_id,
932 operations: chunk,
933 })
934 .ok();
935 }
936 })
937 .detach();
938 return true;
939 }
940 }
941
942 channel_buffer.disconnect(cx);
943 false
944 })
945 }
946 OpenedModelHandle::Loading(_) => true,
947 });
948 })
949 .ok();
950 anyhow::Ok(())
951 })
952 }
953
954 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
955 cx.notify();
956
957 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
958 cx.spawn(move |this, mut cx| async move {
959 if wait_for_reconnect {
960 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
961 }
962
963 if let Some(this) = this.upgrade() {
964 this.update(&mut cx, |this, cx| {
965 for (_, buffer) in this.opened_buffers.drain() {
966 if let OpenedModelHandle::Open(buffer) = buffer {
967 if let Some(buffer) = buffer.upgrade() {
968 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
969 }
970 }
971 }
972 })
973 .ok();
974 }
975 })
976 });
977 }
978
979 pub(crate) fn update_channels(
980 &mut self,
981 payload: proto::UpdateChannels,
982 cx: &mut ModelContext<ChannelStore>,
983 ) -> Option<Task<Result<()>>> {
984 if !payload.remove_channel_invitations.is_empty() {
985 self.channel_invitations
986 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
987 }
988 for channel in payload.channel_invitations {
989 match self
990 .channel_invitations
991 .binary_search_by_key(&channel.id, |c| c.id)
992 {
993 Ok(ix) => {
994 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
995 }
996 Err(ix) => self.channel_invitations.insert(
997 ix,
998 Arc::new(Channel {
999 id: channel.id,
1000 visibility: channel.visibility(),
1001 name: channel.name.into(),
1002 parent_path: channel.parent_path,
1003 }),
1004 ),
1005 }
1006 }
1007
1008 let channels_changed = !payload.channels.is_empty()
1009 || !payload.delete_channels.is_empty()
1010 || !payload.latest_channel_message_ids.is_empty()
1011 || !payload.latest_channel_buffer_versions.is_empty();
1012
1013 if channels_changed {
1014 if !payload.delete_channels.is_empty() {
1015 self.channel_index.delete_channels(&payload.delete_channels);
1016 self.channel_participants
1017 .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
1018
1019 for channel_id in &payload.delete_channels {
1020 let channel_id = *channel_id;
1021 if payload
1022 .channels
1023 .iter()
1024 .any(|channel| channel.id == channel_id)
1025 {
1026 continue;
1027 }
1028 if let Some(OpenedModelHandle::Open(buffer)) =
1029 self.opened_buffers.remove(&channel_id)
1030 {
1031 if let Some(buffer) = buffer.upgrade() {
1032 buffer.update(cx, ChannelBuffer::disconnect);
1033 }
1034 }
1035 }
1036 }
1037
1038 let mut index = self.channel_index.bulk_insert();
1039 for channel in payload.channels {
1040 let id = channel.id;
1041 let channel_changed = index.insert(channel);
1042
1043 if channel_changed {
1044 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1045 if let Some(buffer) = buffer.upgrade() {
1046 buffer.update(cx, ChannelBuffer::channel_changed);
1047 }
1048 }
1049 }
1050 }
1051
1052 for latest_buffer_version in payload.latest_channel_buffer_versions {
1053 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1054 self.channel_states
1055 .entry(latest_buffer_version.channel_id)
1056 .or_default()
1057 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1058 }
1059
1060 for latest_channel_message in payload.latest_channel_message_ids {
1061 self.channel_states
1062 .entry(latest_channel_message.channel_id)
1063 .or_default()
1064 .update_latest_message_id(latest_channel_message.message_id);
1065 }
1066 }
1067
1068 cx.notify();
1069 if payload.channel_participants.is_empty() {
1070 return None;
1071 }
1072
1073 let mut all_user_ids = Vec::new();
1074 let channel_participants = payload.channel_participants;
1075 for entry in &channel_participants {
1076 for user_id in entry.participant_user_ids.iter() {
1077 if let Err(ix) = all_user_ids.binary_search(user_id) {
1078 all_user_ids.insert(ix, *user_id);
1079 }
1080 }
1081 }
1082
1083 let users = self
1084 .user_store
1085 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1086 Some(cx.spawn(|this, mut cx| async move {
1087 let users = users.await?;
1088
1089 this.update(&mut cx, |this, cx| {
1090 for entry in &channel_participants {
1091 let mut participants: Vec<_> = entry
1092 .participant_user_ids
1093 .iter()
1094 .filter_map(|user_id| {
1095 users
1096 .binary_search_by_key(&user_id, |user| &user.id)
1097 .ok()
1098 .map(|ix| users[ix].clone())
1099 })
1100 .collect();
1101
1102 participants.sort_by_key(|u| u.id);
1103
1104 this.channel_participants
1105 .insert(entry.channel_id, participants);
1106 }
1107
1108 cx.notify();
1109 })
1110 }))
1111 }
1112}
1113
1114impl ChannelState {
1115 fn set_role(&mut self, role: ChannelRole) {
1116 self.role = Some(role);
1117 }
1118
1119 fn has_channel_buffer_changed(&self) -> bool {
1120 if let Some(latest_version) = &self.latest_notes_versions {
1121 if let Some(observed_version) = &self.observed_notes_versions {
1122 latest_version.epoch > observed_version.epoch
1123 || latest_version
1124 .version
1125 .changed_since(&observed_version.version)
1126 } else {
1127 true
1128 }
1129 } else {
1130 false
1131 }
1132 }
1133
1134 fn has_new_messages(&self) -> bool {
1135 let latest_message_id = self.latest_chat_message;
1136 let observed_message_id = self.observed_chat_message;
1137
1138 latest_message_id.is_some_and(|latest_message_id| {
1139 latest_message_id > observed_message_id.unwrap_or_default()
1140 })
1141 }
1142
1143 fn acknowledge_message_id(&mut self, message_id: u64) {
1144 let observed = self.observed_chat_message.get_or_insert(message_id);
1145 *observed = (*observed).max(message_id);
1146 }
1147
1148 fn update_latest_message_id(&mut self, message_id: u64) {
1149 self.latest_chat_message =
1150 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1151 }
1152
1153 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1154 if let Some(existing) = &mut self.observed_notes_versions {
1155 if existing.epoch == epoch {
1156 existing.version.join(version);
1157 return;
1158 }
1159 }
1160 self.observed_notes_versions = Some(NotesVersion {
1161 epoch,
1162 version: version.clone(),
1163 });
1164 }
1165
1166 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1167 if let Some(existing) = &mut self.latest_notes_versions {
1168 if existing.epoch == epoch {
1169 existing.version.join(version);
1170 return;
1171 }
1172 }
1173 self.latest_notes_versions = Some(NotesVersion {
1174 epoch,
1175 version: version.clone(),
1176 });
1177 }
1178}