1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use release_channel::RELEASE_CHANNEL;
15use rpc::{
16 proto::{self, ChannelRole, ChannelVisibility},
17 TypedEnvelope,
18};
19use std::{mem, sync::Arc, time::Duration};
20use util::{async_maybe, maybe, ResultExt};
21
22pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
23 let channel_store =
24 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
25 cx.set_global(GlobalChannelStore(channel_store));
26}
27
28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
29
30pub type ChannelId = u64;
31
32#[derive(Debug, Clone, Default)]
33struct NotesVersion {
34 epoch: u64,
35 version: clock::Global,
36}
37
38pub struct ChannelStore {
39 pub channel_index: ChannelIndex,
40 channel_invitations: Vec<Arc<Channel>>,
41 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
42 channel_states: HashMap<ChannelId, ChannelState>,
43
44 outgoing_invites: HashSet<(ChannelId, UserId)>,
45 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
46 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
47 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
48 client: Arc<Client>,
49 user_store: Model<UserStore>,
50 _rpc_subscriptions: [Subscription; 2],
51 _watch_connection_status: Task<Option<()>>,
52 disconnect_channel_buffers_task: Option<Task<()>>,
53 _update_channels: Task<()>,
54}
55
56#[derive(Clone, Debug)]
57pub struct Channel {
58 pub id: ChannelId,
59 pub name: SharedString,
60 pub visibility: proto::ChannelVisibility,
61 pub parent_path: Vec<u64>,
62}
63
64#[derive(Default)]
65pub struct ChannelState {
66 latest_chat_message: Option<u64>,
67 latest_notes_versions: Option<NotesVersion>,
68 observed_chat_message: Option<u64>,
69 observed_notes_versions: Option<NotesVersion>,
70 role: Option<ChannelRole>,
71}
72
73impl Channel {
74 pub fn link(&self) -> String {
75 RELEASE_CHANNEL.link_prefix().to_owned()
76 + "channel/"
77 + &Self::slug(&self.name)
78 + "-"
79 + &self.id.to_string()
80 }
81
82 pub fn notes_link(&self, heading: Option<String>) -> String {
83 self.link()
84 + "/notes"
85 + &heading
86 .map(|h| format!("#{}", Self::slug(&h)))
87 .unwrap_or_default()
88 }
89
90 pub fn is_root_channel(&self) -> bool {
91 self.parent_path.is_empty()
92 }
93
94 pub fn root_id(&self) -> ChannelId {
95 self.parent_path
96 .first()
97 .map(|id| *id as ChannelId)
98 .unwrap_or(self.id)
99 }
100
101 pub fn slug(str: &str) -> String {
102 let slug: String = str
103 .chars()
104 .map(|c| if c.is_alphanumeric() { c } else { '-' })
105 .collect();
106
107 slug.trim_matches(|c| c == '-').to_string()
108 }
109}
110
111pub struct ChannelMembership {
112 pub user: Arc<User>,
113 pub kind: proto::channel_member::Kind,
114 pub role: proto::ChannelRole,
115}
116impl ChannelMembership {
117 pub fn sort_key(&self) -> MembershipSortKey {
118 MembershipSortKey {
119 role_order: match self.role {
120 proto::ChannelRole::Admin => 0,
121 proto::ChannelRole::Member => 1,
122 proto::ChannelRole::Banned => 2,
123 proto::ChannelRole::Guest => 3,
124 },
125 kind_order: match self.kind {
126 proto::channel_member::Kind::Member => 0,
127 proto::channel_member::Kind::Invitee => 1,
128 },
129 username_order: self.user.github_login.as_str(),
130 }
131 }
132}
133
134#[derive(PartialOrd, Ord, PartialEq, Eq)]
135pub struct MembershipSortKey<'a> {
136 role_order: u8,
137 kind_order: u8,
138 username_order: &'a str,
139}
140
141pub enum ChannelEvent {
142 ChannelCreated(ChannelId),
143 ChannelRenamed(ChannelId),
144}
145
146impl EventEmitter<ChannelEvent> for ChannelStore {}
147
148enum OpenedModelHandle<E> {
149 Open(WeakModel<E>),
150 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
151}
152
153struct GlobalChannelStore(Model<ChannelStore>);
154
155impl Global for GlobalChannelStore {}
156
157impl ChannelStore {
158 pub fn global(cx: &AppContext) -> Model<Self> {
159 cx.global::<GlobalChannelStore>().0.clone()
160 }
161
162 pub fn new(
163 client: Arc<Client>,
164 user_store: Model<UserStore>,
165 cx: &mut ModelContext<Self>,
166 ) -> Self {
167 let rpc_subscriptions = [
168 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
169 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
170 ];
171
172 let mut connection_status = client.status();
173 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
174 let watch_connection_status = cx.spawn(|this, mut cx| async move {
175 while let Some(status) = connection_status.next().await {
176 let this = this.upgrade()?;
177 match status {
178 client::Status::Connected { .. } => {
179 this.update(&mut cx, |this, cx| this.handle_connect(cx))
180 .ok()?
181 .await
182 .log_err()?;
183 }
184 client::Status::SignedOut | client::Status::UpgradeRequired => {
185 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
186 .ok();
187 }
188 _ => {
189 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
190 .ok();
191 }
192 }
193 }
194 Some(())
195 });
196
197 Self {
198 channel_invitations: Vec::default(),
199 channel_index: ChannelIndex::default(),
200 channel_participants: Default::default(),
201 outgoing_invites: Default::default(),
202 opened_buffers: Default::default(),
203 opened_chats: Default::default(),
204 update_channels_tx,
205 client,
206 user_store,
207 _rpc_subscriptions: rpc_subscriptions,
208 _watch_connection_status: watch_connection_status,
209 disconnect_channel_buffers_task: None,
210 _update_channels: cx.spawn(|this, mut cx| async move {
211 async_maybe!({
212 while let Some(update_channels) = update_channels_rx.next().await {
213 if let Some(this) = this.upgrade() {
214 let update_task = this.update(&mut cx, |this, cx| {
215 this.update_channels(update_channels, cx)
216 })?;
217 if let Some(update_task) = update_task {
218 update_task.await.log_err();
219 }
220 }
221 }
222 anyhow::Ok(())
223 })
224 .await
225 .log_err();
226 }),
227 channel_states: Default::default(),
228 }
229 }
230
231 pub fn client(&self) -> Arc<Client> {
232 self.client.clone()
233 }
234
235 /// Returns the number of unique channels in the store
236 pub fn channel_count(&self) -> usize {
237 self.channel_index.by_id().len()
238 }
239
240 /// Returns the index of a channel ID in the list of unique channels
241 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
242 self.channel_index
243 .by_id()
244 .keys()
245 .position(|id| *id == channel_id)
246 }
247
248 /// Returns an iterator over all unique channels
249 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
250 self.channel_index.by_id().values()
251 }
252
253 /// Iterate over all entries in the channel DAG
254 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
255 self.channel_index
256 .ordered_channels()
257 .iter()
258 .filter_map(move |id| {
259 let channel = self.channel_index.by_id().get(id)?;
260 Some((channel.parent_path.len(), channel))
261 })
262 }
263
264 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
265 let channel_id = self.channel_index.ordered_channels().get(ix)?;
266 self.channel_index.by_id().get(channel_id)
267 }
268
269 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
270 self.channel_index.by_id().values().nth(ix)
271 }
272
273 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
274 self.channel_invitations
275 .iter()
276 .any(|channel| channel.id == channel_id)
277 }
278
279 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
280 &self.channel_invitations
281 }
282
283 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
284 self.channel_index.by_id().get(&channel_id)
285 }
286
287 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
288 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
289 if let OpenedModelHandle::Open(buffer) = buffer {
290 return buffer.upgrade().is_some();
291 }
292 }
293 false
294 }
295
296 pub fn open_channel_buffer(
297 &mut self,
298 channel_id: ChannelId,
299 cx: &mut ModelContext<Self>,
300 ) -> Task<Result<Model<ChannelBuffer>>> {
301 let client = self.client.clone();
302 let user_store = self.user_store.clone();
303 let channel_store = cx.handle();
304 self.open_channel_resource(
305 channel_id,
306 |this| &mut this.opened_buffers,
307 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
308 cx,
309 )
310 }
311
312 pub fn fetch_channel_messages(
313 &self,
314 message_ids: Vec<u64>,
315 cx: &mut ModelContext<Self>,
316 ) -> Task<Result<Vec<ChannelMessage>>> {
317 let request = if message_ids.is_empty() {
318 None
319 } else {
320 Some(
321 self.client
322 .request(proto::GetChannelMessagesById { message_ids }),
323 )
324 };
325 cx.spawn(|this, mut cx| async move {
326 if let Some(request) = request {
327 let response = request.await?;
328 let this = this
329 .upgrade()
330 .ok_or_else(|| anyhow!("channel store dropped"))?;
331 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
332 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
333 } else {
334 Ok(Vec::new())
335 }
336 })
337 }
338
339 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
340 self.channel_states
341 .get(&channel_id)
342 .is_some_and(|state| state.has_channel_buffer_changed())
343 }
344
345 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
346 self.channel_states
347 .get(&channel_id)
348 .is_some_and(|state| state.has_new_messages())
349 }
350
351 pub fn acknowledge_message_id(
352 &mut self,
353 channel_id: ChannelId,
354 message_id: u64,
355 cx: &mut ModelContext<Self>,
356 ) {
357 self.channel_states
358 .entry(channel_id)
359 .or_insert_with(|| Default::default())
360 .acknowledge_message_id(message_id);
361 cx.notify();
362 }
363
364 pub fn update_latest_message_id(
365 &mut self,
366 channel_id: ChannelId,
367 message_id: u64,
368 cx: &mut ModelContext<Self>,
369 ) {
370 self.channel_states
371 .entry(channel_id)
372 .or_insert_with(|| Default::default())
373 .update_latest_message_id(message_id);
374 cx.notify();
375 }
376
377 pub fn acknowledge_notes_version(
378 &mut self,
379 channel_id: ChannelId,
380 epoch: u64,
381 version: &clock::Global,
382 cx: &mut ModelContext<Self>,
383 ) {
384 self.channel_states
385 .entry(channel_id)
386 .or_insert_with(|| Default::default())
387 .acknowledge_notes_version(epoch, version);
388 cx.notify()
389 }
390
391 pub fn update_latest_notes_version(
392 &mut self,
393 channel_id: ChannelId,
394 epoch: u64,
395 version: &clock::Global,
396 cx: &mut ModelContext<Self>,
397 ) {
398 self.channel_states
399 .entry(channel_id)
400 .or_insert_with(|| Default::default())
401 .update_latest_notes_version(epoch, version);
402 cx.notify()
403 }
404
405 pub fn open_channel_chat(
406 &mut self,
407 channel_id: ChannelId,
408 cx: &mut ModelContext<Self>,
409 ) -> Task<Result<Model<ChannelChat>>> {
410 let client = self.client.clone();
411 let user_store = self.user_store.clone();
412 let this = cx.handle();
413 self.open_channel_resource(
414 channel_id,
415 |this| &mut this.opened_chats,
416 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
417 cx,
418 )
419 }
420
421 /// Asynchronously open a given resource associated with a channel.
422 ///
423 /// Make sure that the resource is only opened once, even if this method
424 /// is called multiple times with the same channel id while the first task
425 /// is still running.
426 fn open_channel_resource<T, F, Fut>(
427 &mut self,
428 channel_id: ChannelId,
429 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
430 load: F,
431 cx: &mut ModelContext<Self>,
432 ) -> Task<Result<Model<T>>>
433 where
434 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
435 Fut: Future<Output = Result<Model<T>>>,
436 T: 'static,
437 {
438 let task = loop {
439 match get_map(self).entry(channel_id) {
440 hash_map::Entry::Occupied(e) => match e.get() {
441 OpenedModelHandle::Open(model) => {
442 if let Some(model) = model.upgrade() {
443 break Task::ready(Ok(model)).shared();
444 } else {
445 get_map(self).remove(&channel_id);
446 continue;
447 }
448 }
449 OpenedModelHandle::Loading(task) => {
450 break task.clone();
451 }
452 },
453 hash_map::Entry::Vacant(e) => {
454 let task = cx
455 .spawn(move |this, mut cx| async move {
456 let channel = this.update(&mut cx, |this, _| {
457 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
458 Arc::new(anyhow!("no channel for id: {}", channel_id))
459 })
460 })??;
461
462 load(channel, cx).await.map_err(Arc::new)
463 })
464 .shared();
465
466 e.insert(OpenedModelHandle::Loading(task.clone()));
467 cx.spawn({
468 let task = task.clone();
469 move |this, mut cx| async move {
470 let result = task.await;
471 this.update(&mut cx, |this, _| match result {
472 Ok(model) => {
473 get_map(this).insert(
474 channel_id,
475 OpenedModelHandle::Open(model.downgrade()),
476 );
477 }
478 Err(_) => {
479 get_map(this).remove(&channel_id);
480 }
481 })
482 .ok();
483 }
484 })
485 .detach();
486 break task;
487 }
488 }
489 };
490 cx.background_executor()
491 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
492 }
493
494 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
495 self.channel_role(channel_id) == proto::ChannelRole::Admin
496 }
497
498 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
499 self.channel_index
500 .by_id()
501 .get(&channel_id)
502 .map_or(false, |channel| channel.is_root_channel())
503 }
504
505 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
506 self.channel_index
507 .by_id()
508 .get(&channel_id)
509 .map_or(false, |channel| {
510 channel.visibility == ChannelVisibility::Public
511 })
512 }
513
514 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
515 match self.channel_role(channel_id) {
516 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
517 _ => Capability::ReadOnly,
518 }
519 }
520
521 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
522 maybe!({
523 let mut channel = self.channel_for_id(channel_id)?;
524 if !channel.is_root_channel() {
525 channel = self.channel_for_id(channel.root_id())?;
526 }
527 let root_channel_state = self.channel_states.get(&channel.id);
528 root_channel_state?.role
529 })
530 .unwrap_or(proto::ChannelRole::Guest)
531 }
532
533 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
534 self.channel_participants
535 .get(&channel_id)
536 .map_or(&[], |v| v.as_slice())
537 }
538
539 pub fn create_channel(
540 &self,
541 name: &str,
542 parent_id: Option<ChannelId>,
543 cx: &mut ModelContext<Self>,
544 ) -> Task<Result<ChannelId>> {
545 let client = self.client.clone();
546 let name = name.trim_start_matches("#").to_owned();
547 cx.spawn(move |this, mut cx| async move {
548 let response = client
549 .request(proto::CreateChannel { name, parent_id })
550 .await?;
551
552 let channel = response
553 .channel
554 .ok_or_else(|| anyhow!("missing channel in response"))?;
555 let channel_id = channel.id;
556
557 this.update(&mut cx, |this, cx| {
558 let task = this.update_channels(
559 proto::UpdateChannels {
560 channels: vec![channel],
561 ..Default::default()
562 },
563 cx,
564 );
565 assert!(task.is_none());
566
567 // This event is emitted because the collab panel wants to clear the pending edit state
568 // before this frame is rendered. But we can't guarantee that the collab panel's future
569 // will resolve before this flush_effects finishes. Synchronously emitting this event
570 // ensures that the collab panel will observe this creation before the frame completes
571 cx.emit(ChannelEvent::ChannelCreated(channel_id));
572 })?;
573
574 Ok(channel_id)
575 })
576 }
577
578 pub fn move_channel(
579 &mut self,
580 channel_id: ChannelId,
581 to: ChannelId,
582 cx: &mut ModelContext<Self>,
583 ) -> Task<Result<()>> {
584 let client = self.client.clone();
585 cx.spawn(move |_, _| async move {
586 let _ = client
587 .request(proto::MoveChannel { channel_id, to })
588 .await?;
589
590 Ok(())
591 })
592 }
593
594 pub fn set_channel_visibility(
595 &mut self,
596 channel_id: ChannelId,
597 visibility: ChannelVisibility,
598 cx: &mut ModelContext<Self>,
599 ) -> Task<Result<()>> {
600 let client = self.client.clone();
601 cx.spawn(move |_, _| async move {
602 let _ = client
603 .request(proto::SetChannelVisibility {
604 channel_id,
605 visibility: visibility.into(),
606 })
607 .await?;
608
609 Ok(())
610 })
611 }
612
613 pub fn invite_member(
614 &mut self,
615 channel_id: ChannelId,
616 user_id: UserId,
617 role: proto::ChannelRole,
618 cx: &mut ModelContext<Self>,
619 ) -> Task<Result<()>> {
620 if !self.outgoing_invites.insert((channel_id, user_id)) {
621 return Task::ready(Err(anyhow!("invite request already in progress")));
622 }
623
624 cx.notify();
625 let client = self.client.clone();
626 cx.spawn(move |this, mut cx| async move {
627 let result = client
628 .request(proto::InviteChannelMember {
629 channel_id,
630 user_id,
631 role: role.into(),
632 })
633 .await;
634
635 this.update(&mut cx, |this, cx| {
636 this.outgoing_invites.remove(&(channel_id, user_id));
637 cx.notify();
638 })?;
639
640 result?;
641
642 Ok(())
643 })
644 }
645
646 pub fn remove_member(
647 &mut self,
648 channel_id: ChannelId,
649 user_id: u64,
650 cx: &mut ModelContext<Self>,
651 ) -> Task<Result<()>> {
652 if !self.outgoing_invites.insert((channel_id, user_id)) {
653 return Task::ready(Err(anyhow!("invite request already in progress")));
654 }
655
656 cx.notify();
657 let client = self.client.clone();
658 cx.spawn(move |this, mut cx| async move {
659 let result = client
660 .request(proto::RemoveChannelMember {
661 channel_id,
662 user_id,
663 })
664 .await;
665
666 this.update(&mut cx, |this, cx| {
667 this.outgoing_invites.remove(&(channel_id, user_id));
668 cx.notify();
669 })?;
670 result?;
671 Ok(())
672 })
673 }
674
675 pub fn set_member_role(
676 &mut self,
677 channel_id: ChannelId,
678 user_id: UserId,
679 role: proto::ChannelRole,
680 cx: &mut ModelContext<Self>,
681 ) -> Task<Result<()>> {
682 if !self.outgoing_invites.insert((channel_id, user_id)) {
683 return Task::ready(Err(anyhow!("member request already in progress")));
684 }
685
686 cx.notify();
687 let client = self.client.clone();
688 cx.spawn(move |this, mut cx| async move {
689 let result = client
690 .request(proto::SetChannelMemberRole {
691 channel_id,
692 user_id,
693 role: role.into(),
694 })
695 .await;
696
697 this.update(&mut cx, |this, cx| {
698 this.outgoing_invites.remove(&(channel_id, user_id));
699 cx.notify();
700 })?;
701
702 result?;
703 Ok(())
704 })
705 }
706
707 pub fn rename(
708 &mut self,
709 channel_id: ChannelId,
710 new_name: &str,
711 cx: &mut ModelContext<Self>,
712 ) -> Task<Result<()>> {
713 let client = self.client.clone();
714 let name = new_name.to_string();
715 cx.spawn(move |this, mut cx| async move {
716 let channel = client
717 .request(proto::RenameChannel { channel_id, name })
718 .await?
719 .channel
720 .ok_or_else(|| anyhow!("missing channel in response"))?;
721 this.update(&mut cx, |this, cx| {
722 let task = this.update_channels(
723 proto::UpdateChannels {
724 channels: vec![channel],
725 ..Default::default()
726 },
727 cx,
728 );
729 assert!(task.is_none());
730
731 // This event is emitted because the collab panel wants to clear the pending edit state
732 // before this frame is rendered. But we can't guarantee that the collab panel's future
733 // will resolve before this flush_effects finishes. Synchronously emitting this event
734 // ensures that the collab panel will observe this creation before the frame complete
735 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
736 })?;
737 Ok(())
738 })
739 }
740
741 pub fn respond_to_channel_invite(
742 &mut self,
743 channel_id: ChannelId,
744 accept: bool,
745 cx: &mut ModelContext<Self>,
746 ) -> Task<Result<()>> {
747 let client = self.client.clone();
748 cx.background_executor().spawn(async move {
749 client
750 .request(proto::RespondToChannelInvite { channel_id, accept })
751 .await?;
752 Ok(())
753 })
754 }
755
756 pub fn get_channel_member_details(
757 &self,
758 channel_id: ChannelId,
759 cx: &mut ModelContext<Self>,
760 ) -> Task<Result<Vec<ChannelMembership>>> {
761 let client = self.client.clone();
762 let user_store = self.user_store.downgrade();
763 cx.spawn(move |_, mut cx| async move {
764 let response = client
765 .request(proto::GetChannelMembers { channel_id })
766 .await?;
767
768 let user_ids = response.members.iter().map(|m| m.user_id).collect();
769 let user_store = user_store
770 .upgrade()
771 .ok_or_else(|| anyhow!("user store dropped"))?;
772 let users = user_store
773 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
774 .await?;
775
776 Ok(users
777 .into_iter()
778 .zip(response.members)
779 .filter_map(|(user, member)| {
780 Some(ChannelMembership {
781 user,
782 role: member.role(),
783 kind: member.kind(),
784 })
785 })
786 .collect())
787 })
788 }
789
790 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
791 let client = self.client.clone();
792 async move {
793 client.request(proto::DeleteChannel { channel_id }).await?;
794 Ok(())
795 }
796 }
797
798 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
799 false
800 }
801
802 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
803 self.outgoing_invites.contains(&(channel_id, user_id))
804 }
805
806 async fn handle_update_channels(
807 this: Model<Self>,
808 message: TypedEnvelope<proto::UpdateChannels>,
809 _: Arc<Client>,
810 mut cx: AsyncAppContext,
811 ) -> Result<()> {
812 this.update(&mut cx, |this, _| {
813 this.update_channels_tx
814 .unbounded_send(message.payload)
815 .unwrap();
816 })?;
817 Ok(())
818 }
819
820 async fn handle_update_user_channels(
821 this: Model<Self>,
822 message: TypedEnvelope<proto::UpdateUserChannels>,
823 _: Arc<Client>,
824 mut cx: AsyncAppContext,
825 ) -> Result<()> {
826 this.update(&mut cx, |this, cx| {
827 for buffer_version in message.payload.observed_channel_buffer_version {
828 let version = language::proto::deserialize_version(&buffer_version.version);
829 this.acknowledge_notes_version(
830 buffer_version.channel_id,
831 buffer_version.epoch,
832 &version,
833 cx,
834 );
835 }
836 for message_id in message.payload.observed_channel_message_id {
837 this.acknowledge_message_id(message_id.channel_id, message_id.message_id, cx);
838 }
839 for membership in message.payload.channel_memberships {
840 if let Some(role) = ChannelRole::from_i32(membership.role) {
841 this.channel_states
842 .entry(membership.channel_id)
843 .or_insert_with(|| ChannelState::default())
844 .set_role(role)
845 }
846 }
847 })
848 }
849
850 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
851 self.channel_index.clear();
852 self.channel_invitations.clear();
853 self.channel_participants.clear();
854 self.channel_index.clear();
855 self.outgoing_invites.clear();
856 self.disconnect_channel_buffers_task.take();
857
858 for chat in self.opened_chats.values() {
859 if let OpenedModelHandle::Open(chat) = chat {
860 if let Some(chat) = chat.upgrade() {
861 chat.update(cx, |chat, cx| {
862 chat.rejoin(cx);
863 });
864 }
865 }
866 }
867
868 let mut buffer_versions = Vec::new();
869 for buffer in self.opened_buffers.values() {
870 if let OpenedModelHandle::Open(buffer) = buffer {
871 if let Some(buffer) = buffer.upgrade() {
872 let channel_buffer = buffer.read(cx);
873 let buffer = channel_buffer.buffer().read(cx);
874 buffer_versions.push(proto::ChannelBufferVersion {
875 channel_id: channel_buffer.channel_id,
876 epoch: channel_buffer.epoch(),
877 version: language::proto::serialize_version(&buffer.version()),
878 });
879 }
880 }
881 }
882
883 if buffer_versions.is_empty() {
884 return Task::ready(Ok(()));
885 }
886
887 let response = self.client.request(proto::RejoinChannelBuffers {
888 buffers: buffer_versions,
889 });
890
891 cx.spawn(|this, mut cx| async move {
892 let mut response = response.await?;
893
894 this.update(&mut cx, |this, cx| {
895 this.opened_buffers.retain(|_, buffer| match buffer {
896 OpenedModelHandle::Open(channel_buffer) => {
897 let Some(channel_buffer) = channel_buffer.upgrade() else {
898 return false;
899 };
900
901 channel_buffer.update(cx, |channel_buffer, cx| {
902 let channel_id = channel_buffer.channel_id;
903 if let Some(remote_buffer) = response
904 .buffers
905 .iter_mut()
906 .find(|buffer| buffer.channel_id == channel_id)
907 {
908 let channel_id = channel_buffer.channel_id;
909 let remote_version =
910 language::proto::deserialize_version(&remote_buffer.version);
911
912 channel_buffer.replace_collaborators(
913 mem::take(&mut remote_buffer.collaborators),
914 cx,
915 );
916
917 let operations = channel_buffer
918 .buffer()
919 .update(cx, |buffer, cx| {
920 let outgoing_operations =
921 buffer.serialize_ops(Some(remote_version), cx);
922 let incoming_operations =
923 mem::take(&mut remote_buffer.operations)
924 .into_iter()
925 .map(language::proto::deserialize_operation)
926 .collect::<Result<Vec<_>>>()?;
927 buffer.apply_ops(incoming_operations, cx)?;
928 anyhow::Ok(outgoing_operations)
929 })
930 .log_err();
931
932 if let Some(operations) = operations {
933 let client = this.client.clone();
934 cx.background_executor()
935 .spawn(async move {
936 let operations = operations.await;
937 for chunk in
938 language::proto::split_operations(operations)
939 {
940 client
941 .send(proto::UpdateChannelBuffer {
942 channel_id,
943 operations: chunk,
944 })
945 .ok();
946 }
947 })
948 .detach();
949 return true;
950 }
951 }
952
953 channel_buffer.disconnect(cx);
954 false
955 })
956 }
957 OpenedModelHandle::Loading(_) => true,
958 });
959 })
960 .ok();
961 anyhow::Ok(())
962 })
963 }
964
965 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
966 cx.notify();
967
968 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
969 cx.spawn(move |this, mut cx| async move {
970 if wait_for_reconnect {
971 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
972 }
973
974 if let Some(this) = this.upgrade() {
975 this.update(&mut cx, |this, cx| {
976 for (_, buffer) in this.opened_buffers.drain() {
977 if let OpenedModelHandle::Open(buffer) = buffer {
978 if let Some(buffer) = buffer.upgrade() {
979 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
980 }
981 }
982 }
983 })
984 .ok();
985 }
986 })
987 });
988 }
989
990 pub(crate) fn update_channels(
991 &mut self,
992 payload: proto::UpdateChannels,
993 cx: &mut ModelContext<ChannelStore>,
994 ) -> Option<Task<Result<()>>> {
995 if !payload.remove_channel_invitations.is_empty() {
996 self.channel_invitations
997 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
998 }
999 for channel in payload.channel_invitations {
1000 match self
1001 .channel_invitations
1002 .binary_search_by_key(&channel.id, |c| c.id)
1003 {
1004 Ok(ix) => {
1005 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1006 }
1007 Err(ix) => self.channel_invitations.insert(
1008 ix,
1009 Arc::new(Channel {
1010 id: channel.id,
1011 visibility: channel.visibility(),
1012 name: channel.name.into(),
1013 parent_path: channel.parent_path,
1014 }),
1015 ),
1016 }
1017 }
1018
1019 let channels_changed = !payload.channels.is_empty()
1020 || !payload.delete_channels.is_empty()
1021 || !payload.latest_channel_message_ids.is_empty()
1022 || !payload.latest_channel_buffer_versions.is_empty();
1023
1024 if channels_changed {
1025 if !payload.delete_channels.is_empty() {
1026 self.channel_index.delete_channels(&payload.delete_channels);
1027 self.channel_participants
1028 .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
1029
1030 for channel_id in &payload.delete_channels {
1031 let channel_id = *channel_id;
1032 if payload
1033 .channels
1034 .iter()
1035 .any(|channel| channel.id == channel_id)
1036 {
1037 continue;
1038 }
1039 if let Some(OpenedModelHandle::Open(buffer)) =
1040 self.opened_buffers.remove(&channel_id)
1041 {
1042 if let Some(buffer) = buffer.upgrade() {
1043 buffer.update(cx, ChannelBuffer::disconnect);
1044 }
1045 }
1046 }
1047 }
1048
1049 let mut index = self.channel_index.bulk_insert();
1050 for channel in payload.channels {
1051 let id = channel.id;
1052 let channel_changed = index.insert(channel);
1053
1054 if channel_changed {
1055 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1056 if let Some(buffer) = buffer.upgrade() {
1057 buffer.update(cx, ChannelBuffer::channel_changed);
1058 }
1059 }
1060 }
1061 }
1062
1063 for latest_buffer_version in payload.latest_channel_buffer_versions {
1064 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1065 self.channel_states
1066 .entry(latest_buffer_version.channel_id)
1067 .or_default()
1068 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1069 }
1070
1071 for latest_channel_message in payload.latest_channel_message_ids {
1072 self.channel_states
1073 .entry(latest_channel_message.channel_id)
1074 .or_default()
1075 .update_latest_message_id(latest_channel_message.message_id);
1076 }
1077 }
1078
1079 cx.notify();
1080 if payload.channel_participants.is_empty() {
1081 return None;
1082 }
1083
1084 let mut all_user_ids = Vec::new();
1085 let channel_participants = payload.channel_participants;
1086 for entry in &channel_participants {
1087 for user_id in entry.participant_user_ids.iter() {
1088 if let Err(ix) = all_user_ids.binary_search(user_id) {
1089 all_user_ids.insert(ix, *user_id);
1090 }
1091 }
1092 }
1093
1094 let users = self
1095 .user_store
1096 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1097 Some(cx.spawn(|this, mut cx| async move {
1098 let users = users.await?;
1099
1100 this.update(&mut cx, |this, cx| {
1101 for entry in &channel_participants {
1102 let mut participants: Vec<_> = entry
1103 .participant_user_ids
1104 .iter()
1105 .filter_map(|user_id| {
1106 users
1107 .binary_search_by_key(&user_id, |user| &user.id)
1108 .ok()
1109 .map(|ix| users[ix].clone())
1110 })
1111 .collect();
1112
1113 participants.sort_by_key(|u| u.id);
1114
1115 this.channel_participants
1116 .insert(entry.channel_id, participants);
1117 }
1118
1119 cx.notify();
1120 })
1121 }))
1122 }
1123}
1124
1125impl ChannelState {
1126 fn set_role(&mut self, role: ChannelRole) {
1127 self.role = Some(role);
1128 }
1129
1130 fn has_channel_buffer_changed(&self) -> bool {
1131 if let Some(latest_version) = &self.latest_notes_versions {
1132 if let Some(observed_version) = &self.observed_notes_versions {
1133 latest_version.epoch > observed_version.epoch
1134 || latest_version
1135 .version
1136 .changed_since(&observed_version.version)
1137 } else {
1138 true
1139 }
1140 } else {
1141 false
1142 }
1143 }
1144
1145 fn has_new_messages(&self) -> bool {
1146 let latest_message_id = self.latest_chat_message;
1147 let observed_message_id = self.observed_chat_message;
1148
1149 latest_message_id.is_some_and(|latest_message_id| {
1150 latest_message_id > observed_message_id.unwrap_or_default()
1151 })
1152 }
1153
1154 fn acknowledge_message_id(&mut self, message_id: u64) {
1155 let observed = self.observed_chat_message.get_or_insert(message_id);
1156 *observed = (*observed).max(message_id);
1157 }
1158
1159 fn update_latest_message_id(&mut self, message_id: u64) {
1160 self.latest_chat_message =
1161 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1162 }
1163
1164 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1165 if let Some(existing) = &mut self.observed_notes_versions {
1166 if existing.epoch == epoch {
1167 existing.version.join(version);
1168 return;
1169 }
1170 }
1171 self.observed_notes_versions = Some(NotesVersion {
1172 epoch,
1173 version: version.clone(),
1174 });
1175 }
1176
1177 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1178 if let Some(existing) = &mut self.latest_notes_versions {
1179 if existing.epoch == epoch {
1180 existing.version.join(version);
1181 return;
1182 }
1183 }
1184 self.latest_notes_versions = Some(NotesVersion {
1185 epoch,
1186 version: version.clone(),
1187 });
1188 }
1189}