1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use db::RELEASE_CHANNEL;
9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
10use gpui::{
11 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
12};
13use rpc::{
14 proto::{self, ChannelVisibility},
15 TypedEnvelope,
16};
17use std::{mem, sync::Arc, time::Duration};
18use util::{async_maybe, ResultExt};
19
20pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
21 let channel_store =
22 cx.build_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
23 cx.set_global(channel_store);
24}
25
26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27
28pub type ChannelId = u64;
29
30pub struct ChannelStore {
31 pub channel_index: ChannelIndex,
32 channel_invitations: Vec<Arc<Channel>>,
33 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
34 outgoing_invites: HashSet<(ChannelId, UserId)>,
35 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
36 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
37 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
38 client: Arc<Client>,
39 user_store: Model<UserStore>,
40 _rpc_subscription: Subscription,
41 _watch_connection_status: Task<Option<()>>,
42 disconnect_channel_buffers_task: Option<Task<()>>,
43 _update_channels: Task<()>,
44}
45
46#[derive(Clone, Debug, PartialEq)]
47pub struct Channel {
48 pub id: ChannelId,
49 pub name: String,
50 pub visibility: proto::ChannelVisibility,
51 pub role: proto::ChannelRole,
52 pub unseen_note_version: Option<(u64, clock::Global)>,
53 pub unseen_message_id: Option<u64>,
54 pub parent_path: Vec<u64>,
55}
56
57impl Channel {
58 pub fn link(&self) -> String {
59 RELEASE_CHANNEL.link_prefix().to_owned()
60 + "channel/"
61 + &self.slug()
62 + "-"
63 + &self.id.to_string()
64 }
65
66 pub fn slug(&self) -> String {
67 let slug: String = self
68 .name
69 .chars()
70 .map(|c| if c.is_alphanumeric() { c } else { '-' })
71 .collect();
72
73 slug.trim_matches(|c| c == '-').to_string()
74 }
75
76 pub fn can_edit_notes(&self) -> bool {
77 self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin
78 }
79}
80
81pub struct ChannelMembership {
82 pub user: Arc<User>,
83 pub kind: proto::channel_member::Kind,
84 pub role: proto::ChannelRole,
85}
86impl ChannelMembership {
87 pub fn sort_key(&self) -> MembershipSortKey {
88 MembershipSortKey {
89 role_order: match self.role {
90 proto::ChannelRole::Admin => 0,
91 proto::ChannelRole::Member => 1,
92 proto::ChannelRole::Banned => 2,
93 proto::ChannelRole::Guest => 3,
94 },
95 kind_order: match self.kind {
96 proto::channel_member::Kind::Member => 0,
97 proto::channel_member::Kind::AncestorMember => 1,
98 proto::channel_member::Kind::Invitee => 2,
99 },
100 username_order: self.user.github_login.as_str(),
101 }
102 }
103}
104
105#[derive(PartialOrd, Ord, PartialEq, Eq)]
106pub struct MembershipSortKey<'a> {
107 role_order: u8,
108 kind_order: u8,
109 username_order: &'a str,
110}
111
112pub enum ChannelEvent {
113 ChannelCreated(ChannelId),
114 ChannelRenamed(ChannelId),
115}
116
117impl EventEmitter<ChannelEvent> for ChannelStore {}
118
119enum OpenedModelHandle<E> {
120 Open(WeakModel<E>),
121 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
122}
123
124impl ChannelStore {
125 pub fn global(cx: &AppContext) -> Model<Self> {
126 cx.global::<Model<Self>>().clone()
127 }
128
129 pub fn new(
130 client: Arc<Client>,
131 user_store: Model<UserStore>,
132 cx: &mut ModelContext<Self>,
133 ) -> Self {
134 let rpc_subscription =
135 client.add_message_handler(cx.weak_model(), Self::handle_update_channels);
136
137 let mut connection_status = client.status();
138 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
139 let watch_connection_status = cx.spawn(|this, mut cx| async move {
140 while let Some(status) = connection_status.next().await {
141 let this = this.upgrade()?;
142 match status {
143 client::Status::Connected { .. } => {
144 this.update(&mut cx, |this, cx| this.handle_connect(cx))
145 .ok()?
146 .await
147 .log_err()?;
148 }
149 client::Status::SignedOut | client::Status::UpgradeRequired => {
150 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
151 .ok();
152 }
153 _ => {
154 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
155 .ok();
156 }
157 }
158 }
159 Some(())
160 });
161
162 Self {
163 channel_invitations: Vec::default(),
164 channel_index: ChannelIndex::default(),
165 channel_participants: Default::default(),
166 outgoing_invites: Default::default(),
167 opened_buffers: Default::default(),
168 opened_chats: Default::default(),
169 update_channels_tx,
170 client,
171 user_store,
172 _rpc_subscription: rpc_subscription,
173 _watch_connection_status: watch_connection_status,
174 disconnect_channel_buffers_task: None,
175 _update_channels: cx.spawn(|this, mut cx| async move {
176 async_maybe!({
177 while let Some(update_channels) = update_channels_rx.next().await {
178 if let Some(this) = this.upgrade() {
179 let update_task = this.update(&mut cx, |this, cx| {
180 this.update_channels(update_channels, cx)
181 })?;
182 if let Some(update_task) = update_task {
183 update_task.await.log_err();
184 }
185 }
186 }
187 anyhow::Ok(())
188 })
189 .await
190 .log_err();
191 }),
192 }
193 }
194
195 pub fn client(&self) -> Arc<Client> {
196 self.client.clone()
197 }
198
199 /// Returns the number of unique channels in the store
200 pub fn channel_count(&self) -> usize {
201 self.channel_index.by_id().len()
202 }
203
204 /// Returns the index of a channel ID in the list of unique channels
205 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
206 self.channel_index
207 .by_id()
208 .keys()
209 .position(|id| *id == channel_id)
210 }
211
212 /// Returns an iterator over all unique channels
213 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
214 self.channel_index.by_id().values()
215 }
216
217 /// Iterate over all entries in the channel DAG
218 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
219 self.channel_index
220 .ordered_channels()
221 .iter()
222 .filter_map(move |id| {
223 let channel = self.channel_index.by_id().get(id)?;
224 Some((channel.parent_path.len(), channel))
225 })
226 }
227
228 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
229 let channel_id = self.channel_index.ordered_channels().get(ix)?;
230 self.channel_index.by_id().get(channel_id)
231 }
232
233 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
234 self.channel_index.by_id().values().nth(ix)
235 }
236
237 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
238 self.channel_invitations
239 .iter()
240 .any(|channel| channel.id == channel_id)
241 }
242
243 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
244 &self.channel_invitations
245 }
246
247 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
248 self.channel_index.by_id().get(&channel_id)
249 }
250
251 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
252 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
253 if let OpenedModelHandle::Open(buffer) = buffer {
254 return buffer.upgrade().is_some();
255 }
256 }
257 false
258 }
259
260 pub fn open_channel_buffer(
261 &mut self,
262 channel_id: ChannelId,
263 cx: &mut ModelContext<Self>,
264 ) -> Task<Result<Model<ChannelBuffer>>> {
265 let client = self.client.clone();
266 let user_store = self.user_store.clone();
267 let channel_store = cx.handle();
268 self.open_channel_resource(
269 channel_id,
270 |this| &mut this.opened_buffers,
271 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
272 cx,
273 )
274 }
275
276 pub fn fetch_channel_messages(
277 &self,
278 message_ids: Vec<u64>,
279 cx: &mut ModelContext<Self>,
280 ) -> Task<Result<Vec<ChannelMessage>>> {
281 let request = if message_ids.is_empty() {
282 None
283 } else {
284 Some(
285 self.client
286 .request(proto::GetChannelMessagesById { message_ids }),
287 )
288 };
289 cx.spawn(|this, mut cx| async move {
290 if let Some(request) = request {
291 let response = request.await?;
292 let this = this
293 .upgrade()
294 .ok_or_else(|| anyhow!("channel store dropped"))?;
295 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
296 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
297 } else {
298 Ok(Vec::new())
299 }
300 })
301 }
302
303 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
304 self.channel_index
305 .by_id()
306 .get(&channel_id)
307 .map(|channel| channel.unseen_note_version.is_some())
308 }
309
310 pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
311 self.channel_index
312 .by_id()
313 .get(&channel_id)
314 .map(|channel| channel.unseen_message_id.is_some())
315 }
316
317 pub fn notes_changed(
318 &mut self,
319 channel_id: ChannelId,
320 epoch: u64,
321 version: &clock::Global,
322 cx: &mut ModelContext<Self>,
323 ) {
324 self.channel_index.note_changed(channel_id, epoch, version);
325 cx.notify();
326 }
327
328 pub fn new_message(
329 &mut self,
330 channel_id: ChannelId,
331 message_id: u64,
332 cx: &mut ModelContext<Self>,
333 ) {
334 self.channel_index.new_message(channel_id, message_id);
335 cx.notify();
336 }
337
338 pub fn acknowledge_message_id(
339 &mut self,
340 channel_id: ChannelId,
341 message_id: u64,
342 cx: &mut ModelContext<Self>,
343 ) {
344 self.channel_index
345 .acknowledge_message_id(channel_id, message_id);
346 cx.notify();
347 }
348
349 pub fn acknowledge_notes_version(
350 &mut self,
351 channel_id: ChannelId,
352 epoch: u64,
353 version: &clock::Global,
354 cx: &mut ModelContext<Self>,
355 ) {
356 self.channel_index
357 .acknowledge_note_version(channel_id, epoch, version);
358 cx.notify();
359 }
360
361 pub fn open_channel_chat(
362 &mut self,
363 channel_id: ChannelId,
364 cx: &mut ModelContext<Self>,
365 ) -> Task<Result<Model<ChannelChat>>> {
366 let client = self.client.clone();
367 let user_store = self.user_store.clone();
368 let this = cx.handle();
369 self.open_channel_resource(
370 channel_id,
371 |this| &mut this.opened_chats,
372 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
373 cx,
374 )
375 }
376
377 /// Asynchronously open a given resource associated with a channel.
378 ///
379 /// Make sure that the resource is only opened once, even if this method
380 /// is called multiple times with the same channel id while the first task
381 /// is still running.
382 fn open_channel_resource<T, F, Fut>(
383 &mut self,
384 channel_id: ChannelId,
385 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
386 load: F,
387 cx: &mut ModelContext<Self>,
388 ) -> Task<Result<Model<T>>>
389 where
390 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
391 Fut: Future<Output = Result<Model<T>>>,
392 T: 'static,
393 {
394 let task = loop {
395 match get_map(self).entry(channel_id) {
396 hash_map::Entry::Occupied(e) => match e.get() {
397 OpenedModelHandle::Open(model) => {
398 if let Some(model) = model.upgrade() {
399 break Task::ready(Ok(model)).shared();
400 } else {
401 get_map(self).remove(&channel_id);
402 continue;
403 }
404 }
405 OpenedModelHandle::Loading(task) => {
406 break task.clone();
407 }
408 },
409 hash_map::Entry::Vacant(e) => {
410 let task = cx
411 .spawn(move |this, mut cx| async move {
412 let channel = this.update(&mut cx, |this, _| {
413 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
414 Arc::new(anyhow!("no channel for id: {}", channel_id))
415 })
416 })??;
417
418 load(channel, cx).await.map_err(Arc::new)
419 })
420 .shared();
421
422 e.insert(OpenedModelHandle::Loading(task.clone()));
423 cx.spawn({
424 let task = task.clone();
425 move |this, mut cx| async move {
426 let result = task.await;
427 this.update(&mut cx, |this, _| match result {
428 Ok(model) => {
429 get_map(this).insert(
430 channel_id,
431 OpenedModelHandle::Open(model.downgrade()),
432 );
433 }
434 Err(_) => {
435 get_map(this).remove(&channel_id);
436 }
437 })
438 .ok();
439 }
440 })
441 .detach();
442 break task;
443 }
444 }
445 };
446 cx.background_executor()
447 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
448 }
449
450 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
451 let Some(channel) = self.channel_for_id(channel_id) else {
452 return false;
453 };
454 channel.role == proto::ChannelRole::Admin
455 }
456
457 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
458 self.channel_participants
459 .get(&channel_id)
460 .map_or(&[], |v| v.as_slice())
461 }
462
463 pub fn create_channel(
464 &self,
465 name: &str,
466 parent_id: Option<ChannelId>,
467 cx: &mut ModelContext<Self>,
468 ) -> Task<Result<ChannelId>> {
469 let client = self.client.clone();
470 let name = name.trim_start_matches("#").to_owned();
471 cx.spawn(move |this, mut cx| async move {
472 let response = client
473 .request(proto::CreateChannel { name, parent_id })
474 .await?;
475
476 let channel = response
477 .channel
478 .ok_or_else(|| anyhow!("missing channel in response"))?;
479 let channel_id = channel.id;
480
481 this.update(&mut cx, |this, cx| {
482 let task = this.update_channels(
483 proto::UpdateChannels {
484 channels: vec![channel],
485 ..Default::default()
486 },
487 cx,
488 );
489 assert!(task.is_none());
490
491 // This event is emitted because the collab panel wants to clear the pending edit state
492 // before this frame is rendered. But we can't guarantee that the collab panel's future
493 // will resolve before this flush_effects finishes. Synchronously emitting this event
494 // ensures that the collab panel will observe this creation before the frame completes
495 cx.emit(ChannelEvent::ChannelCreated(channel_id));
496 })?;
497
498 Ok(channel_id)
499 })
500 }
501
502 pub fn move_channel(
503 &mut self,
504 channel_id: ChannelId,
505 to: Option<ChannelId>,
506 cx: &mut ModelContext<Self>,
507 ) -> Task<Result<()>> {
508 let client = self.client.clone();
509 cx.spawn(move |_, _| async move {
510 let _ = client
511 .request(proto::MoveChannel { channel_id, to })
512 .await?;
513
514 Ok(())
515 })
516 }
517
518 pub fn set_channel_visibility(
519 &mut self,
520 channel_id: ChannelId,
521 visibility: ChannelVisibility,
522 cx: &mut ModelContext<Self>,
523 ) -> Task<Result<()>> {
524 let client = self.client.clone();
525 cx.spawn(move |_, _| async move {
526 let _ = client
527 .request(proto::SetChannelVisibility {
528 channel_id,
529 visibility: visibility.into(),
530 })
531 .await?;
532
533 Ok(())
534 })
535 }
536
537 pub fn invite_member(
538 &mut self,
539 channel_id: ChannelId,
540 user_id: UserId,
541 role: proto::ChannelRole,
542 cx: &mut ModelContext<Self>,
543 ) -> Task<Result<()>> {
544 if !self.outgoing_invites.insert((channel_id, user_id)) {
545 return Task::ready(Err(anyhow!("invite request already in progress")));
546 }
547
548 cx.notify();
549 let client = self.client.clone();
550 cx.spawn(move |this, mut cx| async move {
551 let result = client
552 .request(proto::InviteChannelMember {
553 channel_id,
554 user_id,
555 role: role.into(),
556 })
557 .await;
558
559 this.update(&mut cx, |this, cx| {
560 this.outgoing_invites.remove(&(channel_id, user_id));
561 cx.notify();
562 })?;
563
564 result?;
565
566 Ok(())
567 })
568 }
569
570 pub fn remove_member(
571 &mut self,
572 channel_id: ChannelId,
573 user_id: u64,
574 cx: &mut ModelContext<Self>,
575 ) -> Task<Result<()>> {
576 if !self.outgoing_invites.insert((channel_id, user_id)) {
577 return Task::ready(Err(anyhow!("invite request already in progress")));
578 }
579
580 cx.notify();
581 let client = self.client.clone();
582 cx.spawn(move |this, mut cx| async move {
583 let result = client
584 .request(proto::RemoveChannelMember {
585 channel_id,
586 user_id,
587 })
588 .await;
589
590 this.update(&mut cx, |this, cx| {
591 this.outgoing_invites.remove(&(channel_id, user_id));
592 cx.notify();
593 })?;
594 result?;
595 Ok(())
596 })
597 }
598
599 pub fn set_member_role(
600 &mut self,
601 channel_id: ChannelId,
602 user_id: UserId,
603 role: proto::ChannelRole,
604 cx: &mut ModelContext<Self>,
605 ) -> Task<Result<()>> {
606 if !self.outgoing_invites.insert((channel_id, user_id)) {
607 return Task::ready(Err(anyhow!("member request already in progress")));
608 }
609
610 cx.notify();
611 let client = self.client.clone();
612 cx.spawn(move |this, mut cx| async move {
613 let result = client
614 .request(proto::SetChannelMemberRole {
615 channel_id,
616 user_id,
617 role: role.into(),
618 })
619 .await;
620
621 this.update(&mut cx, |this, cx| {
622 this.outgoing_invites.remove(&(channel_id, user_id));
623 cx.notify();
624 })?;
625
626 result?;
627 Ok(())
628 })
629 }
630
631 pub fn rename(
632 &mut self,
633 channel_id: ChannelId,
634 new_name: &str,
635 cx: &mut ModelContext<Self>,
636 ) -> Task<Result<()>> {
637 let client = self.client.clone();
638 let name = new_name.to_string();
639 cx.spawn(move |this, mut cx| async move {
640 let channel = client
641 .request(proto::RenameChannel { channel_id, name })
642 .await?
643 .channel
644 .ok_or_else(|| anyhow!("missing channel in response"))?;
645 this.update(&mut cx, |this, cx| {
646 let task = this.update_channels(
647 proto::UpdateChannels {
648 channels: vec![channel],
649 ..Default::default()
650 },
651 cx,
652 );
653 assert!(task.is_none());
654
655 // This event is emitted because the collab panel wants to clear the pending edit state
656 // before this frame is rendered. But we can't guarantee that the collab panel's future
657 // will resolve before this flush_effects finishes. Synchronously emitting this event
658 // ensures that the collab panel will observe this creation before the frame complete
659 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
660 })?;
661 Ok(())
662 })
663 }
664
665 pub fn respond_to_channel_invite(
666 &mut self,
667 channel_id: ChannelId,
668 accept: bool,
669 cx: &mut ModelContext<Self>,
670 ) -> Task<Result<()>> {
671 let client = self.client.clone();
672 cx.background_executor().spawn(async move {
673 client
674 .request(proto::RespondToChannelInvite { channel_id, accept })
675 .await?;
676 Ok(())
677 })
678 }
679
680 pub fn get_channel_member_details(
681 &self,
682 channel_id: ChannelId,
683 cx: &mut ModelContext<Self>,
684 ) -> Task<Result<Vec<ChannelMembership>>> {
685 let client = self.client.clone();
686 let user_store = self.user_store.downgrade();
687 cx.spawn(move |_, mut cx| async move {
688 let response = client
689 .request(proto::GetChannelMembers { channel_id })
690 .await?;
691
692 let user_ids = response.members.iter().map(|m| m.user_id).collect();
693 let user_store = user_store
694 .upgrade()
695 .ok_or_else(|| anyhow!("user store dropped"))?;
696 let users = user_store
697 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
698 .await?;
699
700 Ok(users
701 .into_iter()
702 .zip(response.members)
703 .filter_map(|(user, member)| {
704 Some(ChannelMembership {
705 user,
706 role: member.role(),
707 kind: member.kind(),
708 })
709 })
710 .collect())
711 })
712 }
713
714 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
715 let client = self.client.clone();
716 async move {
717 client.request(proto::DeleteChannel { channel_id }).await?;
718 Ok(())
719 }
720 }
721
722 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
723 false
724 }
725
726 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
727 self.outgoing_invites.contains(&(channel_id, user_id))
728 }
729
730 async fn handle_update_channels(
731 this: Model<Self>,
732 message: TypedEnvelope<proto::UpdateChannels>,
733 _: Arc<Client>,
734 mut cx: AsyncAppContext,
735 ) -> Result<()> {
736 this.update(&mut cx, |this, _| {
737 this.update_channels_tx
738 .unbounded_send(message.payload)
739 .unwrap();
740 })?;
741 Ok(())
742 }
743
744 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
745 self.channel_index.clear();
746 self.channel_invitations.clear();
747 self.channel_participants.clear();
748 self.channel_index.clear();
749 self.outgoing_invites.clear();
750 self.disconnect_channel_buffers_task.take();
751
752 for chat in self.opened_chats.values() {
753 if let OpenedModelHandle::Open(chat) = chat {
754 if let Some(chat) = chat.upgrade() {
755 chat.update(cx, |chat, cx| {
756 chat.rejoin(cx);
757 });
758 }
759 }
760 }
761
762 let mut buffer_versions = Vec::new();
763 for buffer in self.opened_buffers.values() {
764 if let OpenedModelHandle::Open(buffer) = buffer {
765 if let Some(buffer) = buffer.upgrade() {
766 let channel_buffer = buffer.read(cx);
767 let buffer = channel_buffer.buffer().read(cx);
768 buffer_versions.push(proto::ChannelBufferVersion {
769 channel_id: channel_buffer.channel_id,
770 epoch: channel_buffer.epoch(),
771 version: language::proto::serialize_version(&buffer.version()),
772 });
773 }
774 }
775 }
776
777 if buffer_versions.is_empty() {
778 return Task::ready(Ok(()));
779 }
780
781 let response = self.client.request(proto::RejoinChannelBuffers {
782 buffers: buffer_versions,
783 });
784
785 cx.spawn(|this, mut cx| async move {
786 let mut response = response.await?;
787
788 this.update(&mut cx, |this, cx| {
789 this.opened_buffers.retain(|_, buffer| match buffer {
790 OpenedModelHandle::Open(channel_buffer) => {
791 let Some(channel_buffer) = channel_buffer.upgrade() else {
792 return false;
793 };
794
795 channel_buffer.update(cx, |channel_buffer, cx| {
796 let channel_id = channel_buffer.channel_id;
797 if let Some(remote_buffer) = response
798 .buffers
799 .iter_mut()
800 .find(|buffer| buffer.channel_id == channel_id)
801 {
802 let channel_id = channel_buffer.channel_id;
803 let remote_version =
804 language::proto::deserialize_version(&remote_buffer.version);
805
806 channel_buffer.replace_collaborators(
807 mem::take(&mut remote_buffer.collaborators),
808 cx,
809 );
810
811 let operations = channel_buffer
812 .buffer()
813 .update(cx, |buffer, cx| {
814 let outgoing_operations =
815 buffer.serialize_ops(Some(remote_version), cx);
816 let incoming_operations =
817 mem::take(&mut remote_buffer.operations)
818 .into_iter()
819 .map(language::proto::deserialize_operation)
820 .collect::<Result<Vec<_>>>()?;
821 buffer.apply_ops(incoming_operations, cx)?;
822 anyhow::Ok(outgoing_operations)
823 })
824 .log_err();
825
826 if let Some(operations) = operations {
827 let client = this.client.clone();
828 cx.background_executor()
829 .spawn(async move {
830 let operations = operations.await;
831 for chunk in
832 language::proto::split_operations(operations)
833 {
834 client
835 .send(proto::UpdateChannelBuffer {
836 channel_id,
837 operations: chunk,
838 })
839 .ok();
840 }
841 })
842 .detach();
843 return true;
844 }
845 }
846
847 channel_buffer.disconnect(cx);
848 false
849 })
850 }
851 OpenedModelHandle::Loading(_) => true,
852 });
853 })
854 .ok();
855 anyhow::Ok(())
856 })
857 }
858
859 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
860 cx.notify();
861
862 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
863 cx.spawn(move |this, mut cx| async move {
864 if wait_for_reconnect {
865 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
866 }
867
868 if let Some(this) = this.upgrade() {
869 this.update(&mut cx, |this, cx| {
870 for (_, buffer) in this.opened_buffers.drain() {
871 if let OpenedModelHandle::Open(buffer) = buffer {
872 if let Some(buffer) = buffer.upgrade() {
873 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
874 }
875 }
876 }
877 })
878 .ok();
879 }
880 })
881 });
882 }
883
884 pub(crate) fn update_channels(
885 &mut self,
886 payload: proto::UpdateChannels,
887 cx: &mut ModelContext<ChannelStore>,
888 ) -> Option<Task<Result<()>>> {
889 if !payload.remove_channel_invitations.is_empty() {
890 self.channel_invitations
891 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
892 }
893 for channel in payload.channel_invitations {
894 match self
895 .channel_invitations
896 .binary_search_by_key(&channel.id, |c| c.id)
897 {
898 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
899 Err(ix) => self.channel_invitations.insert(
900 ix,
901 Arc::new(Channel {
902 id: channel.id,
903 visibility: channel.visibility(),
904 role: channel.role(),
905 name: channel.name,
906 unseen_note_version: None,
907 unseen_message_id: None,
908 parent_path: channel.parent_path,
909 }),
910 ),
911 }
912 }
913
914 let channels_changed = !payload.channels.is_empty()
915 || !payload.delete_channels.is_empty()
916 || !payload.unseen_channel_messages.is_empty()
917 || !payload.unseen_channel_buffer_changes.is_empty();
918
919 if channels_changed {
920 if !payload.delete_channels.is_empty() {
921 self.channel_index.delete_channels(&payload.delete_channels);
922 self.channel_participants
923 .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
924
925 for channel_id in &payload.delete_channels {
926 let channel_id = *channel_id;
927 if payload
928 .channels
929 .iter()
930 .any(|channel| channel.id == channel_id)
931 {
932 continue;
933 }
934 if let Some(OpenedModelHandle::Open(buffer)) =
935 self.opened_buffers.remove(&channel_id)
936 {
937 if let Some(buffer) = buffer.upgrade() {
938 buffer.update(cx, ChannelBuffer::disconnect);
939 }
940 }
941 }
942 }
943
944 let mut index = self.channel_index.bulk_insert();
945 for channel in payload.channels {
946 let id = channel.id;
947 let channel_changed = index.insert(channel);
948
949 if channel_changed {
950 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
951 if let Some(buffer) = buffer.upgrade() {
952 buffer.update(cx, ChannelBuffer::channel_changed);
953 }
954 }
955 }
956 }
957
958 for unseen_buffer_change in payload.unseen_channel_buffer_changes {
959 let version = language::proto::deserialize_version(&unseen_buffer_change.version);
960 index.note_changed(
961 unseen_buffer_change.channel_id,
962 unseen_buffer_change.epoch,
963 &version,
964 );
965 }
966
967 for unseen_channel_message in payload.unseen_channel_messages {
968 index.new_messages(
969 unseen_channel_message.channel_id,
970 unseen_channel_message.message_id,
971 );
972 }
973 }
974
975 cx.notify();
976 if payload.channel_participants.is_empty() {
977 return None;
978 }
979
980 let mut all_user_ids = Vec::new();
981 let channel_participants = payload.channel_participants;
982 for entry in &channel_participants {
983 for user_id in entry.participant_user_ids.iter() {
984 if let Err(ix) = all_user_ids.binary_search(user_id) {
985 all_user_ids.insert(ix, *user_id);
986 }
987 }
988 }
989
990 let users = self
991 .user_store
992 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
993 Some(cx.spawn(|this, mut cx| async move {
994 let users = users.await?;
995
996 this.update(&mut cx, |this, cx| {
997 for entry in &channel_participants {
998 let mut participants: Vec<_> = entry
999 .participant_user_ids
1000 .iter()
1001 .filter_map(|user_id| {
1002 users
1003 .binary_search_by_key(&user_id, |user| &user.id)
1004 .ok()
1005 .map(|ix| users[ix].clone())
1006 })
1007 .collect();
1008
1009 participants.sort_by_key(|u| u.id);
1010
1011 this.channel_participants
1012 .insert(entry.channel_id, participants);
1013 }
1014
1015 cx.notify();
1016 })
1017 }))
1018 }
1019}