1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use db::RELEASE_CHANNEL;
9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
10use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
11use rpc::{
12 proto::{self, ChannelVisibility},
13 TypedEnvelope,
14};
15use std::{mem, sync::Arc, time::Duration};
16use util::ResultExt;
17
18pub fn init(client: &Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut AppContext) {
19 let channel_store =
20 cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
21 cx.set_global(channel_store);
22}
23
24pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
25
26pub type ChannelId = u64;
27
28pub struct ChannelStore {
29 pub channel_index: ChannelIndex,
30 channel_invitations: Vec<Arc<Channel>>,
31 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
32 outgoing_invites: HashSet<(ChannelId, UserId)>,
33 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
34 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
35 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
36 client: Arc<Client>,
37 user_store: ModelHandle<UserStore>,
38 _rpc_subscription: Subscription,
39 _watch_connection_status: Task<Option<()>>,
40 disconnect_channel_buffers_task: Option<Task<()>>,
41 _update_channels: Task<()>,
42}
43
44#[derive(Clone, Debug, PartialEq)]
45pub struct Channel {
46 pub id: ChannelId,
47 pub name: String,
48 pub visibility: proto::ChannelVisibility,
49 pub role: proto::ChannelRole,
50 pub unseen_note_version: Option<(u64, clock::Global)>,
51 pub unseen_message_id: Option<u64>,
52 pub parent_path: Vec<u64>,
53}
54
55impl Channel {
56 pub fn link(&self) -> String {
57 RELEASE_CHANNEL.link_prefix().to_owned()
58 + "channel/"
59 + &self.slug()
60 + "-"
61 + &self.id.to_string()
62 }
63
64 pub fn slug(&self) -> String {
65 let slug: String = self
66 .name
67 .chars()
68 .map(|c| if c.is_alphanumeric() { c } else { '-' })
69 .collect();
70
71 slug.trim_matches(|c| c == '-').to_string()
72 }
73
74 pub fn can_edit_notes(&self) -> bool {
75 self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin
76 }
77}
78
79pub struct ChannelMembership {
80 pub user: Arc<User>,
81 pub kind: proto::channel_member::Kind,
82 pub role: proto::ChannelRole,
83}
84impl ChannelMembership {
85 pub fn sort_key(&self) -> MembershipSortKey {
86 MembershipSortKey {
87 role_order: match self.role {
88 proto::ChannelRole::Admin => 0,
89 proto::ChannelRole::Member => 1,
90 proto::ChannelRole::Banned => 2,
91 proto::ChannelRole::Guest => 3,
92 },
93 kind_order: match self.kind {
94 proto::channel_member::Kind::Member => 0,
95 proto::channel_member::Kind::AncestorMember => 1,
96 proto::channel_member::Kind::Invitee => 2,
97 },
98 username_order: self.user.github_login.as_str(),
99 }
100 }
101}
102
103#[derive(PartialOrd, Ord, PartialEq, Eq)]
104pub struct MembershipSortKey<'a> {
105 role_order: u8,
106 kind_order: u8,
107 username_order: &'a str,
108}
109
110pub enum ChannelEvent {
111 ChannelCreated(ChannelId),
112 ChannelRenamed(ChannelId),
113}
114
115impl Entity for ChannelStore {
116 type Event = ChannelEvent;
117}
118
119enum OpenedModelHandle<E: Entity> {
120 Open(WeakModelHandle<E>),
121 Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
122}
123
124impl ChannelStore {
125 pub fn global(cx: &AppContext) -> ModelHandle<Self> {
126 cx.global::<ModelHandle<Self>>().clone()
127 }
128
129 pub fn new(
130 client: Arc<Client>,
131 user_store: ModelHandle<UserStore>,
132 cx: &mut ModelContext<Self>,
133 ) -> Self {
134 let rpc_subscription =
135 client.add_message_handler(cx.handle(), Self::handle_update_channels);
136
137 let mut connection_status = client.status();
138 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
139 let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
140 while let Some(status) = connection_status.next().await {
141 let this = this.upgrade(&cx)?;
142 match status {
143 client::Status::Connected { .. } => {
144 this.update(&mut cx, |this, cx| this.handle_connect(cx))
145 .await
146 .log_err()?;
147 }
148 client::Status::SignedOut | client::Status::UpgradeRequired => {
149 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx));
150 }
151 _ => {
152 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx));
153 }
154 }
155 }
156 Some(())
157 });
158
159 Self {
160 channel_invitations: Vec::default(),
161 channel_index: ChannelIndex::default(),
162 channel_participants: Default::default(),
163 outgoing_invites: Default::default(),
164 opened_buffers: Default::default(),
165 opened_chats: Default::default(),
166 update_channels_tx,
167 client,
168 user_store,
169 _rpc_subscription: rpc_subscription,
170 _watch_connection_status: watch_connection_status,
171 disconnect_channel_buffers_task: None,
172 _update_channels: cx.spawn_weak(|this, mut cx| async move {
173 while let Some(update_channels) = update_channels_rx.next().await {
174 if let Some(this) = this.upgrade(&cx) {
175 let update_task = this.update(&mut cx, |this, cx| {
176 this.update_channels(update_channels, cx)
177 });
178 if let Some(update_task) = update_task {
179 update_task.await.log_err();
180 }
181 }
182 }
183 }),
184 }
185 }
186
187 pub fn client(&self) -> Arc<Client> {
188 self.client.clone()
189 }
190
191 /// Returns the number of unique channels in the store
192 pub fn channel_count(&self) -> usize {
193 self.channel_index.by_id().len()
194 }
195
196 /// Returns the index of a channel ID in the list of unique channels
197 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
198 self.channel_index
199 .by_id()
200 .keys()
201 .position(|id| *id == channel_id)
202 }
203
204 /// Returns an iterator over all unique channels
205 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
206 self.channel_index.by_id().values()
207 }
208
209 /// Iterate over all entries in the channel DAG
210 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
211 self.channel_index
212 .ordered_channels()
213 .iter()
214 .filter_map(move |id| {
215 let channel = self.channel_index.by_id().get(id)?;
216 Some((channel.parent_path.len(), channel))
217 })
218 }
219
220 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
221 let channel_id = self.channel_index.ordered_channels().get(ix)?;
222 self.channel_index.by_id().get(channel_id)
223 }
224
225 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
226 self.channel_index.by_id().values().nth(ix)
227 }
228
229 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
230 self.channel_invitations
231 .iter()
232 .any(|channel| channel.id == channel_id)
233 }
234
235 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
236 &self.channel_invitations
237 }
238
239 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
240 self.channel_index.by_id().get(&channel_id)
241 }
242
243 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
244 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
245 if let OpenedModelHandle::Open(buffer) = buffer {
246 return buffer.upgrade(cx).is_some();
247 }
248 }
249 false
250 }
251
252 pub fn open_channel_buffer(
253 &mut self,
254 channel_id: ChannelId,
255 cx: &mut ModelContext<Self>,
256 ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
257 let client = self.client.clone();
258 let user_store = self.user_store.clone();
259 let channel_store = cx.handle();
260 self.open_channel_resource(
261 channel_id,
262 |this| &mut this.opened_buffers,
263 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
264 cx,
265 )
266 }
267
268 pub fn fetch_channel_messages(
269 &self,
270 message_ids: Vec<u64>,
271 cx: &mut ModelContext<Self>,
272 ) -> Task<Result<Vec<ChannelMessage>>> {
273 let request = if message_ids.is_empty() {
274 None
275 } else {
276 Some(
277 self.client
278 .request(proto::GetChannelMessagesById { message_ids }),
279 )
280 };
281 cx.spawn_weak(|this, mut cx| async move {
282 if let Some(request) = request {
283 let response = request.await?;
284 let this = this
285 .upgrade(&cx)
286 .ok_or_else(|| anyhow!("channel store dropped"))?;
287 let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
288 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
289 } else {
290 Ok(Vec::new())
291 }
292 })
293 }
294
295 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
296 self.channel_index
297 .by_id()
298 .get(&channel_id)
299 .map(|channel| channel.unseen_note_version.is_some())
300 }
301
302 pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
303 self.channel_index
304 .by_id()
305 .get(&channel_id)
306 .map(|channel| channel.unseen_message_id.is_some())
307 }
308
309 pub fn notes_changed(
310 &mut self,
311 channel_id: ChannelId,
312 epoch: u64,
313 version: &clock::Global,
314 cx: &mut ModelContext<Self>,
315 ) {
316 self.channel_index.note_changed(channel_id, epoch, version);
317 cx.notify();
318 }
319
320 pub fn new_message(
321 &mut self,
322 channel_id: ChannelId,
323 message_id: u64,
324 cx: &mut ModelContext<Self>,
325 ) {
326 self.channel_index.new_message(channel_id, message_id);
327 cx.notify();
328 }
329
330 pub fn acknowledge_message_id(
331 &mut self,
332 channel_id: ChannelId,
333 message_id: u64,
334 cx: &mut ModelContext<Self>,
335 ) {
336 self.channel_index
337 .acknowledge_message_id(channel_id, message_id);
338 cx.notify();
339 }
340
341 pub fn acknowledge_notes_version(
342 &mut self,
343 channel_id: ChannelId,
344 epoch: u64,
345 version: &clock::Global,
346 cx: &mut ModelContext<Self>,
347 ) {
348 self.channel_index
349 .acknowledge_note_version(channel_id, epoch, version);
350 cx.notify();
351 }
352
353 pub fn open_channel_chat(
354 &mut self,
355 channel_id: ChannelId,
356 cx: &mut ModelContext<Self>,
357 ) -> Task<Result<ModelHandle<ChannelChat>>> {
358 let client = self.client.clone();
359 let user_store = self.user_store.clone();
360 let this = cx.handle();
361 self.open_channel_resource(
362 channel_id,
363 |this| &mut this.opened_chats,
364 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
365 cx,
366 )
367 }
368
369 /// Asynchronously open a given resource associated with a channel.
370 ///
371 /// Make sure that the resource is only opened once, even if this method
372 /// is called multiple times with the same channel id while the first task
373 /// is still running.
374 fn open_channel_resource<T: Entity, F, Fut>(
375 &mut self,
376 channel_id: ChannelId,
377 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
378 load: F,
379 cx: &mut ModelContext<Self>,
380 ) -> Task<Result<ModelHandle<T>>>
381 where
382 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
383 Fut: Future<Output = Result<ModelHandle<T>>>,
384 {
385 let task = loop {
386 match get_map(self).entry(channel_id) {
387 hash_map::Entry::Occupied(e) => match e.get() {
388 OpenedModelHandle::Open(model) => {
389 if let Some(model) = model.upgrade(cx) {
390 break Task::ready(Ok(model)).shared();
391 } else {
392 get_map(self).remove(&channel_id);
393 continue;
394 }
395 }
396 OpenedModelHandle::Loading(task) => {
397 break task.clone();
398 }
399 },
400 hash_map::Entry::Vacant(e) => {
401 let task = cx
402 .spawn(|this, cx| async move {
403 let channel = this.read_with(&cx, |this, _| {
404 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
405 Arc::new(anyhow!("no channel for id: {}", channel_id))
406 })
407 })?;
408
409 load(channel, cx).await.map_err(Arc::new)
410 })
411 .shared();
412
413 e.insert(OpenedModelHandle::Loading(task.clone()));
414 cx.spawn({
415 let task = task.clone();
416 |this, mut cx| async move {
417 let result = task.await;
418 this.update(&mut cx, |this, _| match result {
419 Ok(model) => {
420 get_map(this).insert(
421 channel_id,
422 OpenedModelHandle::Open(model.downgrade()),
423 );
424 }
425 Err(_) => {
426 get_map(this).remove(&channel_id);
427 }
428 });
429 }
430 })
431 .detach();
432 break task;
433 }
434 }
435 };
436 cx.foreground()
437 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
438 }
439
440 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
441 let Some(channel) = self.channel_for_id(channel_id) else {
442 return false;
443 };
444 channel.role == proto::ChannelRole::Admin
445 }
446
447 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
448 self.channel_participants
449 .get(&channel_id)
450 .map_or(&[], |v| v.as_slice())
451 }
452
453 pub fn create_channel(
454 &self,
455 name: &str,
456 parent_id: Option<ChannelId>,
457 cx: &mut ModelContext<Self>,
458 ) -> Task<Result<ChannelId>> {
459 let client = self.client.clone();
460 let name = name.trim_start_matches("#").to_owned();
461 cx.spawn(|this, mut cx| async move {
462 let response = client
463 .request(proto::CreateChannel { name, parent_id })
464 .await?;
465
466 let channel = response
467 .channel
468 .ok_or_else(|| anyhow!("missing channel in response"))?;
469 let channel_id = channel.id;
470
471 // let parent_edge = if let Some(parent_id) = parent_id {
472 // vec![ChannelEdge {
473 // channel_id: channel.id,
474 // parent_id,
475 // }]
476 // } else {
477 // vec![]
478 // };
479
480 this.update(&mut cx, |this, cx| {
481 let task = this.update_channels(
482 proto::UpdateChannels {
483 channels: vec![channel],
484 ..Default::default()
485 },
486 cx,
487 );
488 assert!(task.is_none());
489
490 // This event is emitted because the collab panel wants to clear the pending edit state
491 // before this frame is rendered. But we can't guarantee that the collab panel's future
492 // will resolve before this flush_effects finishes. Synchronously emitting this event
493 // ensures that the collab panel will observe this creation before the frame completes
494 cx.emit(ChannelEvent::ChannelCreated(channel_id));
495 });
496
497 Ok(channel_id)
498 })
499 }
500
501 pub fn move_channel(
502 &mut self,
503 channel_id: ChannelId,
504 to: Option<ChannelId>,
505 cx: &mut ModelContext<Self>,
506 ) -> Task<Result<()>> {
507 let client = self.client.clone();
508 cx.spawn(|_, _| async move {
509 let _ = client
510 .request(proto::MoveChannel { channel_id, to })
511 .await?;
512
513 Ok(())
514 })
515 }
516
517 pub fn set_channel_visibility(
518 &mut self,
519 channel_id: ChannelId,
520 visibility: ChannelVisibility,
521 cx: &mut ModelContext<Self>,
522 ) -> Task<Result<()>> {
523 let client = self.client.clone();
524 cx.spawn(|_, _| async move {
525 let _ = client
526 .request(proto::SetChannelVisibility {
527 channel_id,
528 visibility: visibility.into(),
529 })
530 .await?;
531
532 Ok(())
533 })
534 }
535
536 pub fn invite_member(
537 &mut self,
538 channel_id: ChannelId,
539 user_id: UserId,
540 role: proto::ChannelRole,
541 cx: &mut ModelContext<Self>,
542 ) -> Task<Result<()>> {
543 if !self.outgoing_invites.insert((channel_id, user_id)) {
544 return Task::ready(Err(anyhow!("invite request already in progress")));
545 }
546
547 cx.notify();
548 let client = self.client.clone();
549 cx.spawn(|this, mut cx| async move {
550 let result = client
551 .request(proto::InviteChannelMember {
552 channel_id,
553 user_id,
554 role: role.into(),
555 })
556 .await;
557
558 this.update(&mut cx, |this, cx| {
559 this.outgoing_invites.remove(&(channel_id, user_id));
560 cx.notify();
561 });
562
563 result?;
564
565 Ok(())
566 })
567 }
568
569 pub fn remove_member(
570 &mut self,
571 channel_id: ChannelId,
572 user_id: u64,
573 cx: &mut ModelContext<Self>,
574 ) -> Task<Result<()>> {
575 if !self.outgoing_invites.insert((channel_id, user_id)) {
576 return Task::ready(Err(anyhow!("invite request already in progress")));
577 }
578
579 cx.notify();
580 let client = self.client.clone();
581 cx.spawn(|this, mut cx| async move {
582 let result = client
583 .request(proto::RemoveChannelMember {
584 channel_id,
585 user_id,
586 })
587 .await;
588
589 this.update(&mut cx, |this, cx| {
590 this.outgoing_invites.remove(&(channel_id, user_id));
591 cx.notify();
592 });
593 result?;
594 Ok(())
595 })
596 }
597
598 pub fn set_member_role(
599 &mut self,
600 channel_id: ChannelId,
601 user_id: UserId,
602 role: proto::ChannelRole,
603 cx: &mut ModelContext<Self>,
604 ) -> Task<Result<()>> {
605 if !self.outgoing_invites.insert((channel_id, user_id)) {
606 return Task::ready(Err(anyhow!("member request already in progress")));
607 }
608
609 cx.notify();
610 let client = self.client.clone();
611 cx.spawn(|this, mut cx| async move {
612 let result = client
613 .request(proto::SetChannelMemberRole {
614 channel_id,
615 user_id,
616 role: role.into(),
617 })
618 .await;
619
620 this.update(&mut cx, |this, cx| {
621 this.outgoing_invites.remove(&(channel_id, user_id));
622 cx.notify();
623 });
624
625 result?;
626 Ok(())
627 })
628 }
629
630 pub fn rename(
631 &mut self,
632 channel_id: ChannelId,
633 new_name: &str,
634 cx: &mut ModelContext<Self>,
635 ) -> Task<Result<()>> {
636 let client = self.client.clone();
637 let name = new_name.to_string();
638 cx.spawn(|this, mut cx| async move {
639 let channel = client
640 .request(proto::RenameChannel { channel_id, name })
641 .await?
642 .channel
643 .ok_or_else(|| anyhow!("missing channel in response"))?;
644 this.update(&mut cx, |this, cx| {
645 let task = this.update_channels(
646 proto::UpdateChannels {
647 channels: vec![channel],
648 ..Default::default()
649 },
650 cx,
651 );
652 assert!(task.is_none());
653
654 // This event is emitted because the collab panel wants to clear the pending edit state
655 // before this frame is rendered. But we can't guarantee that the collab panel's future
656 // will resolve before this flush_effects finishes. Synchronously emitting this event
657 // ensures that the collab panel will observe this creation before the frame complete
658 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
659 });
660 Ok(())
661 })
662 }
663
664 pub fn respond_to_channel_invite(
665 &mut self,
666 channel_id: ChannelId,
667 accept: bool,
668 cx: &mut ModelContext<Self>,
669 ) -> Task<Result<()>> {
670 let client = self.client.clone();
671 cx.background().spawn(async move {
672 client
673 .request(proto::RespondToChannelInvite { channel_id, accept })
674 .await?;
675 Ok(())
676 })
677 }
678
679 pub fn get_channel_member_details(
680 &self,
681 channel_id: ChannelId,
682 cx: &mut ModelContext<Self>,
683 ) -> Task<Result<Vec<ChannelMembership>>> {
684 let client = self.client.clone();
685 let user_store = self.user_store.downgrade();
686 cx.spawn(|_, mut cx| async move {
687 let response = client
688 .request(proto::GetChannelMembers { channel_id })
689 .await?;
690
691 let user_ids = response.members.iter().map(|m| m.user_id).collect();
692 let user_store = user_store
693 .upgrade(&cx)
694 .ok_or_else(|| anyhow!("user store dropped"))?;
695 let users = user_store
696 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
697 .await?;
698
699 Ok(users
700 .into_iter()
701 .zip(response.members)
702 .filter_map(|(user, member)| {
703 Some(ChannelMembership {
704 user,
705 role: member.role(),
706 kind: member.kind(),
707 })
708 })
709 .collect())
710 })
711 }
712
713 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
714 let client = self.client.clone();
715 async move {
716 client.request(proto::DeleteChannel { channel_id }).await?;
717 Ok(())
718 }
719 }
720
721 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
722 false
723 }
724
725 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
726 self.outgoing_invites.contains(&(channel_id, user_id))
727 }
728
729 async fn handle_update_channels(
730 this: ModelHandle<Self>,
731 message: TypedEnvelope<proto::UpdateChannels>,
732 _: Arc<Client>,
733 mut cx: AsyncAppContext,
734 ) -> Result<()> {
735 this.update(&mut cx, |this, _| {
736 this.update_channels_tx
737 .unbounded_send(message.payload)
738 .unwrap();
739 });
740 Ok(())
741 }
742
743 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
744 self.channel_index.clear();
745 self.channel_invitations.clear();
746 self.channel_participants.clear();
747 self.channel_index.clear();
748 self.outgoing_invites.clear();
749 self.disconnect_channel_buffers_task.take();
750
751 for chat in self.opened_chats.values() {
752 if let OpenedModelHandle::Open(chat) = chat {
753 if let Some(chat) = chat.upgrade(cx) {
754 chat.update(cx, |chat, cx| {
755 chat.rejoin(cx);
756 });
757 }
758 }
759 }
760
761 let mut buffer_versions = Vec::new();
762 for buffer in self.opened_buffers.values() {
763 if let OpenedModelHandle::Open(buffer) = buffer {
764 if let Some(buffer) = buffer.upgrade(cx) {
765 let channel_buffer = buffer.read(cx);
766 let buffer = channel_buffer.buffer().read(cx);
767 buffer_versions.push(proto::ChannelBufferVersion {
768 channel_id: channel_buffer.channel_id,
769 epoch: channel_buffer.epoch(),
770 version: language::proto::serialize_version(&buffer.version()),
771 });
772 }
773 }
774 }
775
776 if buffer_versions.is_empty() {
777 return Task::ready(Ok(()));
778 }
779
780 let response = self.client.request(proto::RejoinChannelBuffers {
781 buffers: buffer_versions,
782 });
783
784 cx.spawn(|this, mut cx| async move {
785 let mut response = response.await?;
786
787 this.update(&mut cx, |this, cx| {
788 this.opened_buffers.retain(|_, buffer| match buffer {
789 OpenedModelHandle::Open(channel_buffer) => {
790 let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
791 return false;
792 };
793
794 channel_buffer.update(cx, |channel_buffer, cx| {
795 let channel_id = channel_buffer.channel_id;
796 if let Some(remote_buffer) = response
797 .buffers
798 .iter_mut()
799 .find(|buffer| buffer.channel_id == channel_id)
800 {
801 let channel_id = channel_buffer.channel_id;
802 let remote_version =
803 language::proto::deserialize_version(&remote_buffer.version);
804
805 channel_buffer.replace_collaborators(
806 mem::take(&mut remote_buffer.collaborators),
807 cx,
808 );
809
810 let operations = channel_buffer
811 .buffer()
812 .update(cx, |buffer, cx| {
813 let outgoing_operations =
814 buffer.serialize_ops(Some(remote_version), cx);
815 let incoming_operations =
816 mem::take(&mut remote_buffer.operations)
817 .into_iter()
818 .map(language::proto::deserialize_operation)
819 .collect::<Result<Vec<_>>>()?;
820 buffer.apply_ops(incoming_operations, cx)?;
821 anyhow::Ok(outgoing_operations)
822 })
823 .log_err();
824
825 if let Some(operations) = operations {
826 let client = this.client.clone();
827 cx.background()
828 .spawn(async move {
829 let operations = operations.await;
830 for chunk in
831 language::proto::split_operations(operations)
832 {
833 client
834 .send(proto::UpdateChannelBuffer {
835 channel_id,
836 operations: chunk,
837 })
838 .ok();
839 }
840 })
841 .detach();
842 return true;
843 }
844 }
845
846 channel_buffer.disconnect(cx);
847 false
848 })
849 }
850 OpenedModelHandle::Loading(_) => true,
851 });
852 });
853 anyhow::Ok(())
854 })
855 }
856
857 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
858 cx.notify();
859
860 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
861 cx.spawn_weak(|this, mut cx| async move {
862 if wait_for_reconnect {
863 cx.background().timer(RECONNECT_TIMEOUT).await;
864 }
865
866 if let Some(this) = this.upgrade(&cx) {
867 this.update(&mut cx, |this, cx| {
868 for (_, buffer) in this.opened_buffers.drain() {
869 if let OpenedModelHandle::Open(buffer) = buffer {
870 if let Some(buffer) = buffer.upgrade(cx) {
871 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
872 }
873 }
874 }
875 });
876 }
877 })
878 });
879 }
880
881 pub(crate) fn update_channels(
882 &mut self,
883 payload: proto::UpdateChannels,
884 cx: &mut ModelContext<ChannelStore>,
885 ) -> Option<Task<Result<()>>> {
886 if !payload.remove_channel_invitations.is_empty() {
887 self.channel_invitations
888 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
889 }
890 for channel in payload.channel_invitations {
891 match self
892 .channel_invitations
893 .binary_search_by_key(&channel.id, |c| c.id)
894 {
895 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
896 Err(ix) => self.channel_invitations.insert(
897 ix,
898 Arc::new(Channel {
899 id: channel.id,
900 visibility: channel.visibility(),
901 role: channel.role(),
902 name: channel.name,
903 unseen_note_version: None,
904 unseen_message_id: None,
905 parent_path: channel.parent_path,
906 }),
907 ),
908 }
909 }
910
911 let channels_changed = !payload.channels.is_empty()
912 || !payload.delete_channels.is_empty()
913 || !payload.unseen_channel_messages.is_empty()
914 || !payload.unseen_channel_buffer_changes.is_empty();
915
916 if channels_changed {
917 if !payload.delete_channels.is_empty() {
918 self.channel_index.delete_channels(&payload.delete_channels);
919 self.channel_participants
920 .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
921
922 for channel_id in &payload.delete_channels {
923 let channel_id = *channel_id;
924 if payload
925 .channels
926 .iter()
927 .any(|channel| channel.id == channel_id)
928 {
929 continue;
930 }
931 if let Some(OpenedModelHandle::Open(buffer)) =
932 self.opened_buffers.remove(&channel_id)
933 {
934 if let Some(buffer) = buffer.upgrade(cx) {
935 buffer.update(cx, ChannelBuffer::disconnect);
936 }
937 }
938 }
939 }
940
941 let mut index = self.channel_index.bulk_insert();
942 for channel in payload.channels {
943 let id = channel.id;
944 let channel_changed = index.insert(channel);
945
946 if channel_changed {
947 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
948 if let Some(buffer) = buffer.upgrade(cx) {
949 buffer.update(cx, ChannelBuffer::channel_changed);
950 }
951 }
952 }
953 }
954
955 for unseen_buffer_change in payload.unseen_channel_buffer_changes {
956 let version = language::proto::deserialize_version(&unseen_buffer_change.version);
957 index.note_changed(
958 unseen_buffer_change.channel_id,
959 unseen_buffer_change.epoch,
960 &version,
961 );
962 }
963
964 for unseen_channel_message in payload.unseen_channel_messages {
965 index.new_messages(
966 unseen_channel_message.channel_id,
967 unseen_channel_message.message_id,
968 );
969 }
970 }
971
972 cx.notify();
973 if payload.channel_participants.is_empty() {
974 return None;
975 }
976
977 let mut all_user_ids = Vec::new();
978 let channel_participants = payload.channel_participants;
979 for entry in &channel_participants {
980 for user_id in entry.participant_user_ids.iter() {
981 if let Err(ix) = all_user_ids.binary_search(user_id) {
982 all_user_ids.insert(ix, *user_id);
983 }
984 }
985 }
986
987 let users = self
988 .user_store
989 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
990 Some(cx.spawn(|this, mut cx| async move {
991 let users = users.await?;
992
993 this.update(&mut cx, |this, cx| {
994 for entry in &channel_participants {
995 let mut participants: Vec<_> = entry
996 .participant_user_ids
997 .iter()
998 .filter_map(|user_id| {
999 users
1000 .binary_search_by_key(&user_id, |user| &user.id)
1001 .ok()
1002 .map(|ix| users[ix].clone())
1003 })
1004 .collect();
1005
1006 participants.sort_by_key(|u| u.id);
1007
1008 this.channel_participants
1009 .insert(entry.channel_id, participants);
1010 }
1011
1012 cx.notify();
1013 });
1014 anyhow::Ok(())
1015 }))
1016 }
1017}