1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
9use gpui::{
10 AppContext, AsyncAppContext, Context, EventEmitter, Global, Model, ModelContext, SharedString,
11 Task, WeakModel,
12};
13use language::Capability;
14use release_channel::RELEASE_CHANNEL;
15use rpc::{
16 proto::{self, ChannelRole, ChannelVisibility},
17 TypedEnvelope,
18};
19use std::{mem, sync::Arc, time::Duration};
20use util::{async_maybe, maybe, ResultExt};
21
22pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
23 let channel_store =
24 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
25 cx.set_global(GlobalChannelStore(channel_store));
26}
27
28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
29
30pub type ChannelId = u64;
31
32#[derive(Debug, Clone, Default)]
33struct NotesVersion {
34 epoch: u64,
35 version: clock::Global,
36}
37
38pub struct ChannelStore {
39 pub channel_index: ChannelIndex,
40 channel_invitations: Vec<Arc<Channel>>,
41 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
42 channel_states: HashMap<ChannelId, ChannelState>,
43
44 outgoing_invites: HashSet<(ChannelId, UserId)>,
45 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
46 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
47 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
48 client: Arc<Client>,
49 user_store: Model<UserStore>,
50 _rpc_subscriptions: [Subscription; 2],
51 _watch_connection_status: Task<Option<()>>,
52 disconnect_channel_buffers_task: Option<Task<()>>,
53 _update_channels: Task<()>,
54}
55
56#[derive(Clone, Debug)]
57pub struct Channel {
58 pub id: ChannelId,
59 pub name: SharedString,
60 pub visibility: proto::ChannelVisibility,
61 pub parent_path: Vec<u64>,
62}
63
64#[derive(Default)]
65pub struct ChannelState {
66 latest_chat_message: Option<u64>,
67 latest_notes_versions: Option<NotesVersion>,
68 observed_chat_message: Option<u64>,
69 observed_notes_versions: Option<NotesVersion>,
70 role: Option<ChannelRole>,
71}
72
73impl Channel {
74 pub fn link(&self) -> String {
75 RELEASE_CHANNEL.link_prefix().to_owned()
76 + "channel/"
77 + &Self::slug(&self.name)
78 + "-"
79 + &self.id.to_string()
80 }
81
82 pub fn notes_link(&self, heading: Option<String>) -> String {
83 self.link()
84 + "/notes"
85 + &heading
86 .map(|h| format!("#{}", Self::slug(&h)))
87 .unwrap_or_default()
88 }
89
90 pub fn is_root_channel(&self) -> bool {
91 self.parent_path.is_empty()
92 }
93
94 pub fn root_id(&self) -> ChannelId {
95 self.parent_path
96 .first()
97 .map(|id| *id as ChannelId)
98 .unwrap_or(self.id)
99 }
100
101 pub fn slug(str: &str) -> String {
102 let slug: String = str
103 .chars()
104 .map(|c| if c.is_alphanumeric() { c } else { '-' })
105 .collect();
106
107 slug.trim_matches(|c| c == '-').to_string()
108 }
109}
110
111pub struct ChannelMembership {
112 pub user: Arc<User>,
113 pub kind: proto::channel_member::Kind,
114 pub role: proto::ChannelRole,
115}
116impl ChannelMembership {
117 pub fn sort_key(&self) -> MembershipSortKey {
118 MembershipSortKey {
119 role_order: match self.role {
120 proto::ChannelRole::Admin => 0,
121 proto::ChannelRole::Member => 1,
122 proto::ChannelRole::Banned => 2,
123 proto::ChannelRole::Guest => 3,
124 },
125 kind_order: match self.kind {
126 proto::channel_member::Kind::Member => 0,
127 proto::channel_member::Kind::Invitee => 1,
128 },
129 username_order: self.user.github_login.as_str(),
130 }
131 }
132}
133
134#[derive(PartialOrd, Ord, PartialEq, Eq)]
135pub struct MembershipSortKey<'a> {
136 role_order: u8,
137 kind_order: u8,
138 username_order: &'a str,
139}
140
141pub enum ChannelEvent {
142 ChannelCreated(ChannelId),
143 ChannelRenamed(ChannelId),
144}
145
146impl EventEmitter<ChannelEvent> for ChannelStore {}
147
148enum OpenedModelHandle<E> {
149 Open(WeakModel<E>),
150 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
151}
152
153struct GlobalChannelStore(Model<ChannelStore>);
154
155impl Global for GlobalChannelStore {}
156
157impl ChannelStore {
158 pub fn global(cx: &AppContext) -> Model<Self> {
159 cx.global::<GlobalChannelStore>().0.clone()
160 }
161
162 pub fn new(
163 client: Arc<Client>,
164 user_store: Model<UserStore>,
165 cx: &mut ModelContext<Self>,
166 ) -> Self {
167 let rpc_subscriptions = [
168 client.add_message_handler(cx.weak_model(), Self::handle_update_channels),
169 client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels),
170 ];
171
172 let mut connection_status = client.status();
173 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
174 let watch_connection_status = cx.spawn(|this, mut cx| async move {
175 while let Some(status) = connection_status.next().await {
176 let this = this.upgrade()?;
177 match status {
178 client::Status::Connected { .. } => {
179 this.update(&mut cx, |this, cx| this.handle_connect(cx))
180 .ok()?
181 .await
182 .log_err()?;
183 }
184 client::Status::SignedOut | client::Status::UpgradeRequired => {
185 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
186 .ok();
187 }
188 _ => {
189 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
190 .ok();
191 }
192 }
193 }
194 Some(())
195 });
196
197 Self {
198 channel_invitations: Vec::default(),
199 channel_index: ChannelIndex::default(),
200 channel_participants: Default::default(),
201 outgoing_invites: Default::default(),
202 opened_buffers: Default::default(),
203 opened_chats: Default::default(),
204 update_channels_tx,
205 client,
206 user_store,
207 _rpc_subscriptions: rpc_subscriptions,
208 _watch_connection_status: watch_connection_status,
209 disconnect_channel_buffers_task: None,
210 _update_channels: cx.spawn(|this, mut cx| async move {
211 async_maybe!({
212 while let Some(update_channels) = update_channels_rx.next().await {
213 if let Some(this) = this.upgrade() {
214 let update_task = this.update(&mut cx, |this, cx| {
215 this.update_channels(update_channels, cx)
216 })?;
217 if let Some(update_task) = update_task {
218 update_task.await.log_err();
219 }
220 }
221 }
222 anyhow::Ok(())
223 })
224 .await
225 .log_err();
226 }),
227 channel_states: Default::default(),
228 }
229 }
230
231 pub fn client(&self) -> Arc<Client> {
232 self.client.clone()
233 }
234
235 /// Returns the number of unique channels in the store
236 pub fn channel_count(&self) -> usize {
237 self.channel_index.by_id().len()
238 }
239
240 /// Returns the index of a channel ID in the list of unique channels
241 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
242 self.channel_index
243 .by_id()
244 .keys()
245 .position(|id| *id == channel_id)
246 }
247
248 /// Returns an iterator over all unique channels
249 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
250 self.channel_index.by_id().values()
251 }
252
253 /// Iterate over all entries in the channel DAG
254 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
255 self.channel_index
256 .ordered_channels()
257 .iter()
258 .filter_map(move |id| {
259 let channel = self.channel_index.by_id().get(id)?;
260 Some((channel.parent_path.len(), channel))
261 })
262 }
263
264 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
265 let channel_id = self.channel_index.ordered_channels().get(ix)?;
266 self.channel_index.by_id().get(channel_id)
267 }
268
269 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
270 self.channel_index.by_id().values().nth(ix)
271 }
272
273 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
274 self.channel_invitations
275 .iter()
276 .any(|channel| channel.id == channel_id)
277 }
278
279 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
280 &self.channel_invitations
281 }
282
283 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
284 self.channel_index.by_id().get(&channel_id)
285 }
286
287 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
288 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
289 if let OpenedModelHandle::Open(buffer) = buffer {
290 return buffer.upgrade().is_some();
291 }
292 }
293 false
294 }
295
296 pub fn open_channel_buffer(
297 &mut self,
298 channel_id: ChannelId,
299 cx: &mut ModelContext<Self>,
300 ) -> Task<Result<Model<ChannelBuffer>>> {
301 let client = self.client.clone();
302 let user_store = self.user_store.clone();
303 let channel_store = cx.handle();
304 self.open_channel_resource(
305 channel_id,
306 |this| &mut this.opened_buffers,
307 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
308 cx,
309 )
310 }
311
312 pub fn fetch_channel_messages(
313 &self,
314 message_ids: Vec<u64>,
315 cx: &mut ModelContext<Self>,
316 ) -> Task<Result<Vec<ChannelMessage>>> {
317 let request = if message_ids.is_empty() {
318 None
319 } else {
320 Some(
321 self.client
322 .request(proto::GetChannelMessagesById { message_ids }),
323 )
324 };
325 cx.spawn(|this, mut cx| async move {
326 if let Some(request) = request {
327 let response = request.await?;
328 let this = this
329 .upgrade()
330 .ok_or_else(|| anyhow!("channel store dropped"))?;
331 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
332 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
333 } else {
334 Ok(Vec::new())
335 }
336 })
337 }
338
339 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool {
340 self.channel_states
341 .get(&channel_id)
342 .is_some_and(|state| state.has_channel_buffer_changed())
343 }
344
345 pub fn has_new_messages(&self, channel_id: ChannelId) -> bool {
346 self.channel_states
347 .get(&channel_id)
348 .is_some_and(|state| state.has_new_messages())
349 }
350
351 pub fn last_acknowledge_message_id(&self, channel_id: ChannelId) -> Option<u64> {
352 self.channel_states.get(&channel_id).and_then(|state| {
353 if let Some(last_message_id) = state.latest_chat_message {
354 if state
355 .last_acknowledged_message_id()
356 .is_some_and(|id| id < last_message_id)
357 {
358 return state.last_acknowledged_message_id();
359 }
360 }
361
362 None
363 })
364 }
365
366 pub fn acknowledge_message_id(
367 &mut self,
368 channel_id: ChannelId,
369 message_id: u64,
370 cx: &mut ModelContext<Self>,
371 ) {
372 self.channel_states
373 .entry(channel_id)
374 .or_insert_with(|| Default::default())
375 .acknowledge_message_id(message_id);
376 cx.notify();
377 }
378
379 pub fn update_latest_message_id(
380 &mut self,
381 channel_id: ChannelId,
382 message_id: u64,
383 cx: &mut ModelContext<Self>,
384 ) {
385 self.channel_states
386 .entry(channel_id)
387 .or_insert_with(|| Default::default())
388 .update_latest_message_id(message_id);
389 cx.notify();
390 }
391
392 pub fn acknowledge_notes_version(
393 &mut self,
394 channel_id: ChannelId,
395 epoch: u64,
396 version: &clock::Global,
397 cx: &mut ModelContext<Self>,
398 ) {
399 self.channel_states
400 .entry(channel_id)
401 .or_insert_with(|| Default::default())
402 .acknowledge_notes_version(epoch, version);
403 cx.notify()
404 }
405
406 pub fn update_latest_notes_version(
407 &mut self,
408 channel_id: ChannelId,
409 epoch: u64,
410 version: &clock::Global,
411 cx: &mut ModelContext<Self>,
412 ) {
413 self.channel_states
414 .entry(channel_id)
415 .or_insert_with(|| Default::default())
416 .update_latest_notes_version(epoch, version);
417 cx.notify()
418 }
419
420 pub fn open_channel_chat(
421 &mut self,
422 channel_id: ChannelId,
423 cx: &mut ModelContext<Self>,
424 ) -> Task<Result<Model<ChannelChat>>> {
425 let client = self.client.clone();
426 let user_store = self.user_store.clone();
427 let this = cx.handle();
428 self.open_channel_resource(
429 channel_id,
430 |this| &mut this.opened_chats,
431 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
432 cx,
433 )
434 }
435
436 /// Asynchronously open a given resource associated with a channel.
437 ///
438 /// Make sure that the resource is only opened once, even if this method
439 /// is called multiple times with the same channel id while the first task
440 /// is still running.
441 fn open_channel_resource<T, F, Fut>(
442 &mut self,
443 channel_id: ChannelId,
444 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
445 load: F,
446 cx: &mut ModelContext<Self>,
447 ) -> Task<Result<Model<T>>>
448 where
449 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
450 Fut: Future<Output = Result<Model<T>>>,
451 T: 'static,
452 {
453 let task = loop {
454 match get_map(self).entry(channel_id) {
455 hash_map::Entry::Occupied(e) => match e.get() {
456 OpenedModelHandle::Open(model) => {
457 if let Some(model) = model.upgrade() {
458 break Task::ready(Ok(model)).shared();
459 } else {
460 get_map(self).remove(&channel_id);
461 continue;
462 }
463 }
464 OpenedModelHandle::Loading(task) => {
465 break task.clone();
466 }
467 },
468 hash_map::Entry::Vacant(e) => {
469 let task = cx
470 .spawn(move |this, mut cx| async move {
471 let channel = this.update(&mut cx, |this, _| {
472 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
473 Arc::new(anyhow!("no channel for id: {}", channel_id))
474 })
475 })??;
476
477 load(channel, cx).await.map_err(Arc::new)
478 })
479 .shared();
480
481 e.insert(OpenedModelHandle::Loading(task.clone()));
482 cx.spawn({
483 let task = task.clone();
484 move |this, mut cx| async move {
485 let result = task.await;
486 this.update(&mut cx, |this, _| match result {
487 Ok(model) => {
488 get_map(this).insert(
489 channel_id,
490 OpenedModelHandle::Open(model.downgrade()),
491 );
492 }
493 Err(_) => {
494 get_map(this).remove(&channel_id);
495 }
496 })
497 .ok();
498 }
499 })
500 .detach();
501 break task;
502 }
503 }
504 };
505 cx.background_executor()
506 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
507 }
508
509 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
510 self.channel_role(channel_id) == proto::ChannelRole::Admin
511 }
512
513 pub fn is_root_channel(&self, channel_id: ChannelId) -> bool {
514 self.channel_index
515 .by_id()
516 .get(&channel_id)
517 .map_or(false, |channel| channel.is_root_channel())
518 }
519
520 pub fn is_public_channel(&self, channel_id: ChannelId) -> bool {
521 self.channel_index
522 .by_id()
523 .get(&channel_id)
524 .map_or(false, |channel| {
525 channel.visibility == ChannelVisibility::Public
526 })
527 }
528
529 pub fn channel_capability(&self, channel_id: ChannelId) -> Capability {
530 match self.channel_role(channel_id) {
531 ChannelRole::Admin | ChannelRole::Member => Capability::ReadWrite,
532 _ => Capability::ReadOnly,
533 }
534 }
535
536 pub fn channel_role(&self, channel_id: ChannelId) -> proto::ChannelRole {
537 maybe!({
538 let mut channel = self.channel_for_id(channel_id)?;
539 if !channel.is_root_channel() {
540 channel = self.channel_for_id(channel.root_id())?;
541 }
542 let root_channel_state = self.channel_states.get(&channel.id);
543 root_channel_state?.role
544 })
545 .unwrap_or(proto::ChannelRole::Guest)
546 }
547
548 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
549 self.channel_participants
550 .get(&channel_id)
551 .map_or(&[], |v| v.as_slice())
552 }
553
554 pub fn create_channel(
555 &self,
556 name: &str,
557 parent_id: Option<ChannelId>,
558 cx: &mut ModelContext<Self>,
559 ) -> Task<Result<ChannelId>> {
560 let client = self.client.clone();
561 let name = name.trim_start_matches("#").to_owned();
562 cx.spawn(move |this, mut cx| async move {
563 let response = client
564 .request(proto::CreateChannel { name, parent_id })
565 .await?;
566
567 let channel = response
568 .channel
569 .ok_or_else(|| anyhow!("missing channel in response"))?;
570 let channel_id = channel.id;
571
572 this.update(&mut cx, |this, cx| {
573 let task = this.update_channels(
574 proto::UpdateChannels {
575 channels: vec![channel],
576 ..Default::default()
577 },
578 cx,
579 );
580 assert!(task.is_none());
581
582 // This event is emitted because the collab panel wants to clear the pending edit state
583 // before this frame is rendered. But we can't guarantee that the collab panel's future
584 // will resolve before this flush_effects finishes. Synchronously emitting this event
585 // ensures that the collab panel will observe this creation before the frame completes
586 cx.emit(ChannelEvent::ChannelCreated(channel_id));
587 })?;
588
589 Ok(channel_id)
590 })
591 }
592
593 pub fn move_channel(
594 &mut self,
595 channel_id: ChannelId,
596 to: ChannelId,
597 cx: &mut ModelContext<Self>,
598 ) -> Task<Result<()>> {
599 let client = self.client.clone();
600 cx.spawn(move |_, _| async move {
601 let _ = client
602 .request(proto::MoveChannel { channel_id, to })
603 .await?;
604
605 Ok(())
606 })
607 }
608
609 pub fn set_channel_visibility(
610 &mut self,
611 channel_id: ChannelId,
612 visibility: ChannelVisibility,
613 cx: &mut ModelContext<Self>,
614 ) -> Task<Result<()>> {
615 let client = self.client.clone();
616 cx.spawn(move |_, _| async move {
617 let _ = client
618 .request(proto::SetChannelVisibility {
619 channel_id,
620 visibility: visibility.into(),
621 })
622 .await?;
623
624 Ok(())
625 })
626 }
627
628 pub fn invite_member(
629 &mut self,
630 channel_id: ChannelId,
631 user_id: UserId,
632 role: proto::ChannelRole,
633 cx: &mut ModelContext<Self>,
634 ) -> Task<Result<()>> {
635 if !self.outgoing_invites.insert((channel_id, user_id)) {
636 return Task::ready(Err(anyhow!("invite request already in progress")));
637 }
638
639 cx.notify();
640 let client = self.client.clone();
641 cx.spawn(move |this, mut cx| async move {
642 let result = client
643 .request(proto::InviteChannelMember {
644 channel_id,
645 user_id,
646 role: role.into(),
647 })
648 .await;
649
650 this.update(&mut cx, |this, cx| {
651 this.outgoing_invites.remove(&(channel_id, user_id));
652 cx.notify();
653 })?;
654
655 result?;
656
657 Ok(())
658 })
659 }
660
661 pub fn remove_member(
662 &mut self,
663 channel_id: ChannelId,
664 user_id: u64,
665 cx: &mut ModelContext<Self>,
666 ) -> Task<Result<()>> {
667 if !self.outgoing_invites.insert((channel_id, user_id)) {
668 return Task::ready(Err(anyhow!("invite request already in progress")));
669 }
670
671 cx.notify();
672 let client = self.client.clone();
673 cx.spawn(move |this, mut cx| async move {
674 let result = client
675 .request(proto::RemoveChannelMember {
676 channel_id,
677 user_id,
678 })
679 .await;
680
681 this.update(&mut cx, |this, cx| {
682 this.outgoing_invites.remove(&(channel_id, user_id));
683 cx.notify();
684 })?;
685 result?;
686 Ok(())
687 })
688 }
689
690 pub fn set_member_role(
691 &mut self,
692 channel_id: ChannelId,
693 user_id: UserId,
694 role: proto::ChannelRole,
695 cx: &mut ModelContext<Self>,
696 ) -> Task<Result<()>> {
697 if !self.outgoing_invites.insert((channel_id, user_id)) {
698 return Task::ready(Err(anyhow!("member request already in progress")));
699 }
700
701 cx.notify();
702 let client = self.client.clone();
703 cx.spawn(move |this, mut cx| async move {
704 let result = client
705 .request(proto::SetChannelMemberRole {
706 channel_id,
707 user_id,
708 role: role.into(),
709 })
710 .await;
711
712 this.update(&mut cx, |this, cx| {
713 this.outgoing_invites.remove(&(channel_id, user_id));
714 cx.notify();
715 })?;
716
717 result?;
718 Ok(())
719 })
720 }
721
722 pub fn rename(
723 &mut self,
724 channel_id: ChannelId,
725 new_name: &str,
726 cx: &mut ModelContext<Self>,
727 ) -> Task<Result<()>> {
728 let client = self.client.clone();
729 let name = new_name.to_string();
730 cx.spawn(move |this, mut cx| async move {
731 let channel = client
732 .request(proto::RenameChannel { channel_id, name })
733 .await?
734 .channel
735 .ok_or_else(|| anyhow!("missing channel in response"))?;
736 this.update(&mut cx, |this, cx| {
737 let task = this.update_channels(
738 proto::UpdateChannels {
739 channels: vec![channel],
740 ..Default::default()
741 },
742 cx,
743 );
744 assert!(task.is_none());
745
746 // This event is emitted because the collab panel wants to clear the pending edit state
747 // before this frame is rendered. But we can't guarantee that the collab panel's future
748 // will resolve before this flush_effects finishes. Synchronously emitting this event
749 // ensures that the collab panel will observe this creation before the frame complete
750 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
751 })?;
752 Ok(())
753 })
754 }
755
756 pub fn respond_to_channel_invite(
757 &mut self,
758 channel_id: ChannelId,
759 accept: bool,
760 cx: &mut ModelContext<Self>,
761 ) -> Task<Result<()>> {
762 let client = self.client.clone();
763 cx.background_executor().spawn(async move {
764 client
765 .request(proto::RespondToChannelInvite { channel_id, accept })
766 .await?;
767 Ok(())
768 })
769 }
770
771 pub fn get_channel_member_details(
772 &self,
773 channel_id: ChannelId,
774 cx: &mut ModelContext<Self>,
775 ) -> Task<Result<Vec<ChannelMembership>>> {
776 let client = self.client.clone();
777 let user_store = self.user_store.downgrade();
778 cx.spawn(move |_, mut cx| async move {
779 let response = client
780 .request(proto::GetChannelMembers { channel_id })
781 .await?;
782
783 let user_ids = response.members.iter().map(|m| m.user_id).collect();
784 let user_store = user_store
785 .upgrade()
786 .ok_or_else(|| anyhow!("user store dropped"))?;
787 let users = user_store
788 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
789 .await?;
790
791 Ok(users
792 .into_iter()
793 .zip(response.members)
794 .filter_map(|(user, member)| {
795 Some(ChannelMembership {
796 user,
797 role: member.role(),
798 kind: member.kind(),
799 })
800 })
801 .collect())
802 })
803 }
804
805 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
806 let client = self.client.clone();
807 async move {
808 client.request(proto::DeleteChannel { channel_id }).await?;
809 Ok(())
810 }
811 }
812
813 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
814 false
815 }
816
817 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
818 self.outgoing_invites.contains(&(channel_id, user_id))
819 }
820
821 async fn handle_update_channels(
822 this: Model<Self>,
823 message: TypedEnvelope<proto::UpdateChannels>,
824 _: Arc<Client>,
825 mut cx: AsyncAppContext,
826 ) -> Result<()> {
827 this.update(&mut cx, |this, _| {
828 this.update_channels_tx
829 .unbounded_send(message.payload)
830 .unwrap();
831 })?;
832 Ok(())
833 }
834
835 async fn handle_update_user_channels(
836 this: Model<Self>,
837 message: TypedEnvelope<proto::UpdateUserChannels>,
838 _: Arc<Client>,
839 mut cx: AsyncAppContext,
840 ) -> Result<()> {
841 this.update(&mut cx, |this, cx| {
842 for buffer_version in message.payload.observed_channel_buffer_version {
843 let version = language::proto::deserialize_version(&buffer_version.version);
844 this.acknowledge_notes_version(
845 buffer_version.channel_id,
846 buffer_version.epoch,
847 &version,
848 cx,
849 );
850 }
851 for message_id in message.payload.observed_channel_message_id {
852 this.acknowledge_message_id(message_id.channel_id, message_id.message_id, cx);
853 }
854 for membership in message.payload.channel_memberships {
855 if let Some(role) = ChannelRole::from_i32(membership.role) {
856 this.channel_states
857 .entry(membership.channel_id)
858 .or_insert_with(|| ChannelState::default())
859 .set_role(role)
860 }
861 }
862 })
863 }
864
865 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
866 self.channel_index.clear();
867 self.channel_invitations.clear();
868 self.channel_participants.clear();
869 self.channel_index.clear();
870 self.outgoing_invites.clear();
871 self.disconnect_channel_buffers_task.take();
872
873 for chat in self.opened_chats.values() {
874 if let OpenedModelHandle::Open(chat) = chat {
875 if let Some(chat) = chat.upgrade() {
876 chat.update(cx, |chat, cx| {
877 chat.rejoin(cx);
878 });
879 }
880 }
881 }
882
883 let mut buffer_versions = Vec::new();
884 for buffer in self.opened_buffers.values() {
885 if let OpenedModelHandle::Open(buffer) = buffer {
886 if let Some(buffer) = buffer.upgrade() {
887 let channel_buffer = buffer.read(cx);
888 let buffer = channel_buffer.buffer().read(cx);
889 buffer_versions.push(proto::ChannelBufferVersion {
890 channel_id: channel_buffer.channel_id,
891 epoch: channel_buffer.epoch(),
892 version: language::proto::serialize_version(&buffer.version()),
893 });
894 }
895 }
896 }
897
898 if buffer_versions.is_empty() {
899 return Task::ready(Ok(()));
900 }
901
902 let response = self.client.request(proto::RejoinChannelBuffers {
903 buffers: buffer_versions,
904 });
905
906 cx.spawn(|this, mut cx| async move {
907 let mut response = response.await?;
908
909 this.update(&mut cx, |this, cx| {
910 this.opened_buffers.retain(|_, buffer| match buffer {
911 OpenedModelHandle::Open(channel_buffer) => {
912 let Some(channel_buffer) = channel_buffer.upgrade() else {
913 return false;
914 };
915
916 channel_buffer.update(cx, |channel_buffer, cx| {
917 let channel_id = channel_buffer.channel_id;
918 if let Some(remote_buffer) = response
919 .buffers
920 .iter_mut()
921 .find(|buffer| buffer.channel_id == channel_id)
922 {
923 let channel_id = channel_buffer.channel_id;
924 let remote_version =
925 language::proto::deserialize_version(&remote_buffer.version);
926
927 channel_buffer.replace_collaborators(
928 mem::take(&mut remote_buffer.collaborators),
929 cx,
930 );
931
932 let operations = channel_buffer
933 .buffer()
934 .update(cx, |buffer, cx| {
935 let outgoing_operations =
936 buffer.serialize_ops(Some(remote_version), cx);
937 let incoming_operations =
938 mem::take(&mut remote_buffer.operations)
939 .into_iter()
940 .map(language::proto::deserialize_operation)
941 .collect::<Result<Vec<_>>>()?;
942 buffer.apply_ops(incoming_operations, cx)?;
943 anyhow::Ok(outgoing_operations)
944 })
945 .log_err();
946
947 if let Some(operations) = operations {
948 let client = this.client.clone();
949 cx.background_executor()
950 .spawn(async move {
951 let operations = operations.await;
952 for chunk in
953 language::proto::split_operations(operations)
954 {
955 client
956 .send(proto::UpdateChannelBuffer {
957 channel_id,
958 operations: chunk,
959 })
960 .ok();
961 }
962 })
963 .detach();
964 return true;
965 }
966 }
967
968 channel_buffer.disconnect(cx);
969 false
970 })
971 }
972 OpenedModelHandle::Loading(_) => true,
973 });
974 })
975 .ok();
976 anyhow::Ok(())
977 })
978 }
979
980 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
981 cx.notify();
982
983 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
984 cx.spawn(move |this, mut cx| async move {
985 if wait_for_reconnect {
986 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
987 }
988
989 if let Some(this) = this.upgrade() {
990 this.update(&mut cx, |this, cx| {
991 for (_, buffer) in this.opened_buffers.drain() {
992 if let OpenedModelHandle::Open(buffer) = buffer {
993 if let Some(buffer) = buffer.upgrade() {
994 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
995 }
996 }
997 }
998 })
999 .ok();
1000 }
1001 })
1002 });
1003 }
1004
1005 pub(crate) fn update_channels(
1006 &mut self,
1007 payload: proto::UpdateChannels,
1008 cx: &mut ModelContext<ChannelStore>,
1009 ) -> Option<Task<Result<()>>> {
1010 if !payload.remove_channel_invitations.is_empty() {
1011 self.channel_invitations
1012 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
1013 }
1014 for channel in payload.channel_invitations {
1015 match self
1016 .channel_invitations
1017 .binary_search_by_key(&channel.id, |c| c.id)
1018 {
1019 Ok(ix) => {
1020 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
1021 }
1022 Err(ix) => self.channel_invitations.insert(
1023 ix,
1024 Arc::new(Channel {
1025 id: channel.id,
1026 visibility: channel.visibility(),
1027 name: channel.name.into(),
1028 parent_path: channel.parent_path,
1029 }),
1030 ),
1031 }
1032 }
1033
1034 let channels_changed = !payload.channels.is_empty()
1035 || !payload.delete_channels.is_empty()
1036 || !payload.latest_channel_message_ids.is_empty()
1037 || !payload.latest_channel_buffer_versions.is_empty();
1038
1039 if channels_changed {
1040 if !payload.delete_channels.is_empty() {
1041 self.channel_index.delete_channels(&payload.delete_channels);
1042 self.channel_participants
1043 .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
1044
1045 for channel_id in &payload.delete_channels {
1046 let channel_id = *channel_id;
1047 if payload
1048 .channels
1049 .iter()
1050 .any(|channel| channel.id == channel_id)
1051 {
1052 continue;
1053 }
1054 if let Some(OpenedModelHandle::Open(buffer)) =
1055 self.opened_buffers.remove(&channel_id)
1056 {
1057 if let Some(buffer) = buffer.upgrade() {
1058 buffer.update(cx, ChannelBuffer::disconnect);
1059 }
1060 }
1061 }
1062 }
1063
1064 let mut index = self.channel_index.bulk_insert();
1065 for channel in payload.channels {
1066 let id = channel.id;
1067 let channel_changed = index.insert(channel);
1068
1069 if channel_changed {
1070 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
1071 if let Some(buffer) = buffer.upgrade() {
1072 buffer.update(cx, ChannelBuffer::channel_changed);
1073 }
1074 }
1075 }
1076 }
1077
1078 for latest_buffer_version in payload.latest_channel_buffer_versions {
1079 let version = language::proto::deserialize_version(&latest_buffer_version.version);
1080 self.channel_states
1081 .entry(latest_buffer_version.channel_id)
1082 .or_default()
1083 .update_latest_notes_version(latest_buffer_version.epoch, &version)
1084 }
1085
1086 for latest_channel_message in payload.latest_channel_message_ids {
1087 self.channel_states
1088 .entry(latest_channel_message.channel_id)
1089 .or_default()
1090 .update_latest_message_id(latest_channel_message.message_id);
1091 }
1092 }
1093
1094 cx.notify();
1095 if payload.channel_participants.is_empty() {
1096 return None;
1097 }
1098
1099 let mut all_user_ids = Vec::new();
1100 let channel_participants = payload.channel_participants;
1101 for entry in &channel_participants {
1102 for user_id in entry.participant_user_ids.iter() {
1103 if let Err(ix) = all_user_ids.binary_search(user_id) {
1104 all_user_ids.insert(ix, *user_id);
1105 }
1106 }
1107 }
1108
1109 let users = self
1110 .user_store
1111 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1112 Some(cx.spawn(|this, mut cx| async move {
1113 let users = users.await?;
1114
1115 this.update(&mut cx, |this, cx| {
1116 for entry in &channel_participants {
1117 let mut participants: Vec<_> = entry
1118 .participant_user_ids
1119 .iter()
1120 .filter_map(|user_id| {
1121 users
1122 .binary_search_by_key(&user_id, |user| &user.id)
1123 .ok()
1124 .map(|ix| users[ix].clone())
1125 })
1126 .collect();
1127
1128 participants.sort_by_key(|u| u.id);
1129
1130 this.channel_participants
1131 .insert(entry.channel_id, participants);
1132 }
1133
1134 cx.notify();
1135 })
1136 }))
1137 }
1138}
1139
1140impl ChannelState {
1141 fn set_role(&mut self, role: ChannelRole) {
1142 self.role = Some(role);
1143 }
1144
1145 fn has_channel_buffer_changed(&self) -> bool {
1146 if let Some(latest_version) = &self.latest_notes_versions {
1147 if let Some(observed_version) = &self.observed_notes_versions {
1148 latest_version.epoch > observed_version.epoch
1149 || (latest_version.epoch == observed_version.epoch
1150 && latest_version
1151 .version
1152 .changed_since(&observed_version.version))
1153 } else {
1154 true
1155 }
1156 } else {
1157 false
1158 }
1159 }
1160
1161 fn has_new_messages(&self) -> bool {
1162 let latest_message_id = self.latest_chat_message;
1163 let observed_message_id = self.observed_chat_message;
1164
1165 latest_message_id.is_some_and(|latest_message_id| {
1166 latest_message_id > observed_message_id.unwrap_or_default()
1167 })
1168 }
1169
1170 fn last_acknowledged_message_id(&self) -> Option<u64> {
1171 self.observed_chat_message
1172 }
1173
1174 fn acknowledge_message_id(&mut self, message_id: u64) {
1175 let observed = self.observed_chat_message.get_or_insert(message_id);
1176 *observed = (*observed).max(message_id);
1177 }
1178
1179 fn update_latest_message_id(&mut self, message_id: u64) {
1180 self.latest_chat_message =
1181 Some(message_id.max(self.latest_chat_message.unwrap_or_default()));
1182 }
1183
1184 fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1185 if let Some(existing) = &mut self.observed_notes_versions {
1186 if existing.epoch == epoch {
1187 existing.version.join(version);
1188 return;
1189 }
1190 }
1191 self.observed_notes_versions = Some(NotesVersion {
1192 epoch,
1193 version: version.clone(),
1194 });
1195 }
1196
1197 fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) {
1198 if let Some(existing) = &mut self.latest_notes_versions {
1199 if existing.epoch == epoch {
1200 existing.version.join(version);
1201 return;
1202 }
1203 }
1204 self.latest_notes_versions = Some(NotesVersion {
1205 epoch,
1206 version: version.clone(),
1207 });
1208 }
1209}