1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use db::RELEASE_CHANNEL;
9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
10use gpui::{
11 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel,
12};
13use rpc::{
14 proto::{self, ChannelVisibility},
15 TypedEnvelope,
16};
17use std::{mem, sync::Arc, time::Duration};
18use util::{async_maybe, ResultExt};
19
20pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
21 let channel_store =
22 cx.build_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
23 cx.set_global(channel_store);
24}
25
26pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
27
28pub type ChannelId = u64;
29
30pub struct ChannelStore {
31 pub channel_index: ChannelIndex,
32 channel_invitations: Vec<Arc<Channel>>,
33 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
34 outgoing_invites: HashSet<(ChannelId, UserId)>,
35 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
36 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
37 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
38 client: Arc<Client>,
39 user_store: Model<UserStore>,
40 _rpc_subscription: Subscription,
41 _watch_connection_status: Task<Option<()>>,
42 disconnect_channel_buffers_task: Option<Task<()>>,
43 _update_channels: Task<()>,
44}
45
46#[derive(Clone, Debug, PartialEq)]
47pub struct Channel {
48 pub id: ChannelId,
49 pub name: String,
50 pub visibility: proto::ChannelVisibility,
51 pub role: proto::ChannelRole,
52 pub unseen_note_version: Option<(u64, clock::Global)>,
53 pub unseen_message_id: Option<u64>,
54 pub parent_path: Vec<u64>,
55}
56
57impl Channel {
58 pub fn link(&self) -> String {
59 RELEASE_CHANNEL.link_prefix().to_owned()
60 + "channel/"
61 + &self.slug()
62 + "-"
63 + &self.id.to_string()
64 }
65
66 pub fn slug(&self) -> String {
67 let slug: String = self
68 .name
69 .chars()
70 .map(|c| if c.is_alphanumeric() { c } else { '-' })
71 .collect();
72
73 slug.trim_matches(|c| c == '-').to_string()
74 }
75
76 pub fn can_edit_notes(&self) -> bool {
77 self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin
78 }
79}
80
81pub struct ChannelMembership {
82 pub user: Arc<User>,
83 pub kind: proto::channel_member::Kind,
84 pub role: proto::ChannelRole,
85}
86impl ChannelMembership {
87 pub fn sort_key(&self) -> MembershipSortKey {
88 MembershipSortKey {
89 role_order: match self.role {
90 proto::ChannelRole::Admin => 0,
91 proto::ChannelRole::Member => 1,
92 proto::ChannelRole::Banned => 2,
93 proto::ChannelRole::Guest => 3,
94 },
95 kind_order: match self.kind {
96 proto::channel_member::Kind::Member => 0,
97 proto::channel_member::Kind::AncestorMember => 1,
98 proto::channel_member::Kind::Invitee => 2,
99 },
100 username_order: self.user.github_login.as_str(),
101 }
102 }
103}
104
105#[derive(PartialOrd, Ord, PartialEq, Eq)]
106pub struct MembershipSortKey<'a> {
107 role_order: u8,
108 kind_order: u8,
109 username_order: &'a str,
110}
111
112pub enum ChannelEvent {
113 ChannelCreated(ChannelId),
114 ChannelRenamed(ChannelId),
115}
116
117impl EventEmitter for ChannelStore {
118 type Event = ChannelEvent;
119}
120
121enum OpenedModelHandle<E> {
122 Open(WeakModel<E>),
123 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
124}
125
126impl ChannelStore {
127 pub fn global(cx: &AppContext) -> Model<Self> {
128 cx.global::<Model<Self>>().clone()
129 }
130
131 pub fn new(
132 client: Arc<Client>,
133 user_store: Model<UserStore>,
134 cx: &mut ModelContext<Self>,
135 ) -> Self {
136 let rpc_subscription =
137 client.add_message_handler(cx.weak_model(), Self::handle_update_channels);
138
139 let mut connection_status = client.status();
140 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
141 let watch_connection_status = cx.spawn(|this, mut cx| async move {
142 while let Some(status) = connection_status.next().await {
143 let this = this.upgrade()?;
144 match status {
145 client::Status::Connected { .. } => {
146 this.update(&mut cx, |this, cx| this.handle_connect(cx))
147 .ok()?
148 .await
149 .log_err()?;
150 }
151 client::Status::SignedOut | client::Status::UpgradeRequired => {
152 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
153 .ok();
154 }
155 _ => {
156 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
157 .ok();
158 }
159 }
160 }
161 Some(())
162 });
163
164 Self {
165 channel_invitations: Vec::default(),
166 channel_index: ChannelIndex::default(),
167 channel_participants: Default::default(),
168 outgoing_invites: Default::default(),
169 opened_buffers: Default::default(),
170 opened_chats: Default::default(),
171 update_channels_tx,
172 client,
173 user_store,
174 _rpc_subscription: rpc_subscription,
175 _watch_connection_status: watch_connection_status,
176 disconnect_channel_buffers_task: None,
177 _update_channels: cx.spawn(|this, mut cx| async move {
178 async_maybe!({
179 while let Some(update_channels) = update_channels_rx.next().await {
180 if let Some(this) = this.upgrade() {
181 let update_task = this.update(&mut cx, |this, cx| {
182 this.update_channels(update_channels, cx)
183 })?;
184 if let Some(update_task) = update_task {
185 update_task.await.log_err();
186 }
187 }
188 }
189 anyhow::Ok(())
190 })
191 .await
192 .log_err();
193 }),
194 }
195 }
196
197 pub fn client(&self) -> Arc<Client> {
198 self.client.clone()
199 }
200
201 /// Returns the number of unique channels in the store
202 pub fn channel_count(&self) -> usize {
203 self.channel_index.by_id().len()
204 }
205
206 /// Returns the index of a channel ID in the list of unique channels
207 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
208 self.channel_index
209 .by_id()
210 .keys()
211 .position(|id| *id == channel_id)
212 }
213
214 /// Returns an iterator over all unique channels
215 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
216 self.channel_index.by_id().values()
217 }
218
219 /// Iterate over all entries in the channel DAG
220 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
221 self.channel_index
222 .ordered_channels()
223 .iter()
224 .filter_map(move |id| {
225 let channel = self.channel_index.by_id().get(id)?;
226 Some((channel.parent_path.len(), channel))
227 })
228 }
229
230 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
231 let channel_id = self.channel_index.ordered_channels().get(ix)?;
232 self.channel_index.by_id().get(channel_id)
233 }
234
235 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
236 self.channel_index.by_id().values().nth(ix)
237 }
238
239 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
240 self.channel_invitations
241 .iter()
242 .any(|channel| channel.id == channel_id)
243 }
244
245 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
246 &self.channel_invitations
247 }
248
249 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
250 self.channel_index.by_id().get(&channel_id)
251 }
252
253 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
254 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
255 if let OpenedModelHandle::Open(buffer) = buffer {
256 return buffer.upgrade().is_some();
257 }
258 }
259 false
260 }
261
262 pub fn open_channel_buffer(
263 &mut self,
264 channel_id: ChannelId,
265 cx: &mut ModelContext<Self>,
266 ) -> Task<Result<Model<ChannelBuffer>>> {
267 let client = self.client.clone();
268 let user_store = self.user_store.clone();
269 let channel_store = cx.handle();
270 self.open_channel_resource(
271 channel_id,
272 |this| &mut this.opened_buffers,
273 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
274 cx,
275 )
276 }
277
278 pub fn fetch_channel_messages(
279 &self,
280 message_ids: Vec<u64>,
281 cx: &mut ModelContext<Self>,
282 ) -> Task<Result<Vec<ChannelMessage>>> {
283 let request = if message_ids.is_empty() {
284 None
285 } else {
286 Some(
287 self.client
288 .request(proto::GetChannelMessagesById { message_ids }),
289 )
290 };
291 cx.spawn(|this, mut cx| async move {
292 if let Some(request) = request {
293 let response = request.await?;
294 let this = this
295 .upgrade()
296 .ok_or_else(|| anyhow!("channel store dropped"))?;
297 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
298 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
299 } else {
300 Ok(Vec::new())
301 }
302 })
303 }
304
305 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
306 self.channel_index
307 .by_id()
308 .get(&channel_id)
309 .map(|channel| channel.unseen_note_version.is_some())
310 }
311
312 pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
313 self.channel_index
314 .by_id()
315 .get(&channel_id)
316 .map(|channel| channel.unseen_message_id.is_some())
317 }
318
319 pub fn notes_changed(
320 &mut self,
321 channel_id: ChannelId,
322 epoch: u64,
323 version: &clock::Global,
324 cx: &mut ModelContext<Self>,
325 ) {
326 self.channel_index.note_changed(channel_id, epoch, version);
327 cx.notify();
328 }
329
330 pub fn new_message(
331 &mut self,
332 channel_id: ChannelId,
333 message_id: u64,
334 cx: &mut ModelContext<Self>,
335 ) {
336 self.channel_index.new_message(channel_id, message_id);
337 cx.notify();
338 }
339
340 pub fn acknowledge_message_id(
341 &mut self,
342 channel_id: ChannelId,
343 message_id: u64,
344 cx: &mut ModelContext<Self>,
345 ) {
346 self.channel_index
347 .acknowledge_message_id(channel_id, message_id);
348 cx.notify();
349 }
350
351 pub fn acknowledge_notes_version(
352 &mut self,
353 channel_id: ChannelId,
354 epoch: u64,
355 version: &clock::Global,
356 cx: &mut ModelContext<Self>,
357 ) {
358 self.channel_index
359 .acknowledge_note_version(channel_id, epoch, version);
360 cx.notify();
361 }
362
363 pub fn open_channel_chat(
364 &mut self,
365 channel_id: ChannelId,
366 cx: &mut ModelContext<Self>,
367 ) -> Task<Result<Model<ChannelChat>>> {
368 let client = self.client.clone();
369 let user_store = self.user_store.clone();
370 let this = cx.handle();
371 self.open_channel_resource(
372 channel_id,
373 |this| &mut this.opened_chats,
374 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
375 cx,
376 )
377 }
378
379 /// Asynchronously open a given resource associated with a channel.
380 ///
381 /// Make sure that the resource is only opened once, even if this method
382 /// is called multiple times with the same channel id while the first task
383 /// is still running.
384 fn open_channel_resource<T, F, Fut>(
385 &mut self,
386 channel_id: ChannelId,
387 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
388 load: F,
389 cx: &mut ModelContext<Self>,
390 ) -> Task<Result<Model<T>>>
391 where
392 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
393 Fut: Future<Output = Result<Model<T>>>,
394 T: 'static,
395 {
396 let task = loop {
397 match get_map(self).entry(channel_id) {
398 hash_map::Entry::Occupied(e) => match e.get() {
399 OpenedModelHandle::Open(model) => {
400 if let Some(model) = model.upgrade() {
401 break Task::ready(Ok(model)).shared();
402 } else {
403 get_map(self).remove(&channel_id);
404 continue;
405 }
406 }
407 OpenedModelHandle::Loading(task) => {
408 break task.clone();
409 }
410 },
411 hash_map::Entry::Vacant(e) => {
412 let task = cx
413 .spawn(move |this, mut cx| async move {
414 let channel = this.update(&mut cx, |this, _| {
415 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
416 Arc::new(anyhow!("no channel for id: {}", channel_id))
417 })
418 })??;
419
420 load(channel, cx).await.map_err(Arc::new)
421 })
422 .shared();
423
424 e.insert(OpenedModelHandle::Loading(task.clone()));
425 cx.spawn({
426 let task = task.clone();
427 move |this, mut cx| async move {
428 let result = task.await;
429 this.update(&mut cx, |this, _| match result {
430 Ok(model) => {
431 get_map(this).insert(
432 channel_id,
433 OpenedModelHandle::Open(model.downgrade()),
434 );
435 }
436 Err(_) => {
437 get_map(this).remove(&channel_id);
438 }
439 })
440 .ok();
441 }
442 })
443 .detach();
444 break task;
445 }
446 }
447 };
448 cx.background_executor()
449 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
450 }
451
452 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
453 let Some(channel) = self.channel_for_id(channel_id) else {
454 return false;
455 };
456 channel.role == proto::ChannelRole::Admin
457 }
458
459 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
460 self.channel_participants
461 .get(&channel_id)
462 .map_or(&[], |v| v.as_slice())
463 }
464
465 pub fn create_channel(
466 &self,
467 name: &str,
468 parent_id: Option<ChannelId>,
469 cx: &mut ModelContext<Self>,
470 ) -> Task<Result<ChannelId>> {
471 let client = self.client.clone();
472 let name = name.trim_start_matches("#").to_owned();
473 cx.spawn(move |this, mut cx| async move {
474 let response = client
475 .request(proto::CreateChannel { name, parent_id })
476 .await?;
477
478 let channel = response
479 .channel
480 .ok_or_else(|| anyhow!("missing channel in response"))?;
481 let channel_id = channel.id;
482
483 this.update(&mut cx, |this, cx| {
484 let task = this.update_channels(
485 proto::UpdateChannels {
486 channels: vec![channel],
487 ..Default::default()
488 },
489 cx,
490 );
491 assert!(task.is_none());
492
493 // This event is emitted because the collab panel wants to clear the pending edit state
494 // before this frame is rendered. But we can't guarantee that the collab panel's future
495 // will resolve before this flush_effects finishes. Synchronously emitting this event
496 // ensures that the collab panel will observe this creation before the frame completes
497 cx.emit(ChannelEvent::ChannelCreated(channel_id));
498 })?;
499
500 Ok(channel_id)
501 })
502 }
503
504 pub fn move_channel(
505 &mut self,
506 channel_id: ChannelId,
507 to: Option<ChannelId>,
508 cx: &mut ModelContext<Self>,
509 ) -> Task<Result<()>> {
510 let client = self.client.clone();
511 cx.spawn(move |_, _| async move {
512 let _ = client
513 .request(proto::MoveChannel { channel_id, to })
514 .await?;
515
516 Ok(())
517 })
518 }
519
520 pub fn set_channel_visibility(
521 &mut self,
522 channel_id: ChannelId,
523 visibility: ChannelVisibility,
524 cx: &mut ModelContext<Self>,
525 ) -> Task<Result<()>> {
526 let client = self.client.clone();
527 cx.spawn(move |_, _| async move {
528 let _ = client
529 .request(proto::SetChannelVisibility {
530 channel_id,
531 visibility: visibility.into(),
532 })
533 .await?;
534
535 Ok(())
536 })
537 }
538
539 pub fn invite_member(
540 &mut self,
541 channel_id: ChannelId,
542 user_id: UserId,
543 role: proto::ChannelRole,
544 cx: &mut ModelContext<Self>,
545 ) -> Task<Result<()>> {
546 if !self.outgoing_invites.insert((channel_id, user_id)) {
547 return Task::ready(Err(anyhow!("invite request already in progress")));
548 }
549
550 cx.notify();
551 let client = self.client.clone();
552 cx.spawn(move |this, mut cx| async move {
553 let result = client
554 .request(proto::InviteChannelMember {
555 channel_id,
556 user_id,
557 role: role.into(),
558 })
559 .await;
560
561 this.update(&mut cx, |this, cx| {
562 this.outgoing_invites.remove(&(channel_id, user_id));
563 cx.notify();
564 })?;
565
566 result?;
567
568 Ok(())
569 })
570 }
571
572 pub fn remove_member(
573 &mut self,
574 channel_id: ChannelId,
575 user_id: u64,
576 cx: &mut ModelContext<Self>,
577 ) -> Task<Result<()>> {
578 if !self.outgoing_invites.insert((channel_id, user_id)) {
579 return Task::ready(Err(anyhow!("invite request already in progress")));
580 }
581
582 cx.notify();
583 let client = self.client.clone();
584 cx.spawn(move |this, mut cx| async move {
585 let result = client
586 .request(proto::RemoveChannelMember {
587 channel_id,
588 user_id,
589 })
590 .await;
591
592 this.update(&mut cx, |this, cx| {
593 this.outgoing_invites.remove(&(channel_id, user_id));
594 cx.notify();
595 })?;
596 result?;
597 Ok(())
598 })
599 }
600
601 pub fn set_member_role(
602 &mut self,
603 channel_id: ChannelId,
604 user_id: UserId,
605 role: proto::ChannelRole,
606 cx: &mut ModelContext<Self>,
607 ) -> Task<Result<()>> {
608 if !self.outgoing_invites.insert((channel_id, user_id)) {
609 return Task::ready(Err(anyhow!("member request already in progress")));
610 }
611
612 cx.notify();
613 let client = self.client.clone();
614 cx.spawn(move |this, mut cx| async move {
615 let result = client
616 .request(proto::SetChannelMemberRole {
617 channel_id,
618 user_id,
619 role: role.into(),
620 })
621 .await;
622
623 this.update(&mut cx, |this, cx| {
624 this.outgoing_invites.remove(&(channel_id, user_id));
625 cx.notify();
626 })?;
627
628 result?;
629 Ok(())
630 })
631 }
632
633 pub fn rename(
634 &mut self,
635 channel_id: ChannelId,
636 new_name: &str,
637 cx: &mut ModelContext<Self>,
638 ) -> Task<Result<()>> {
639 let client = self.client.clone();
640 let name = new_name.to_string();
641 cx.spawn(move |this, mut cx| async move {
642 let channel = client
643 .request(proto::RenameChannel { channel_id, name })
644 .await?
645 .channel
646 .ok_or_else(|| anyhow!("missing channel in response"))?;
647 this.update(&mut cx, |this, cx| {
648 let task = this.update_channels(
649 proto::UpdateChannels {
650 channels: vec![channel],
651 ..Default::default()
652 },
653 cx,
654 );
655 assert!(task.is_none());
656
657 // This event is emitted because the collab panel wants to clear the pending edit state
658 // before this frame is rendered. But we can't guarantee that the collab panel's future
659 // will resolve before this flush_effects finishes. Synchronously emitting this event
660 // ensures that the collab panel will observe this creation before the frame complete
661 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
662 })?;
663 Ok(())
664 })
665 }
666
667 pub fn respond_to_channel_invite(
668 &mut self,
669 channel_id: ChannelId,
670 accept: bool,
671 cx: &mut ModelContext<Self>,
672 ) -> Task<Result<()>> {
673 let client = self.client.clone();
674 cx.background_executor().spawn(async move {
675 client
676 .request(proto::RespondToChannelInvite { channel_id, accept })
677 .await?;
678 Ok(())
679 })
680 }
681
682 pub fn get_channel_member_details(
683 &self,
684 channel_id: ChannelId,
685 cx: &mut ModelContext<Self>,
686 ) -> Task<Result<Vec<ChannelMembership>>> {
687 let client = self.client.clone();
688 let user_store = self.user_store.downgrade();
689 cx.spawn(move |_, mut cx| async move {
690 let response = client
691 .request(proto::GetChannelMembers { channel_id })
692 .await?;
693
694 let user_ids = response.members.iter().map(|m| m.user_id).collect();
695 let user_store = user_store
696 .upgrade()
697 .ok_or_else(|| anyhow!("user store dropped"))?;
698 let users = user_store
699 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
700 .await?;
701
702 Ok(users
703 .into_iter()
704 .zip(response.members)
705 .filter_map(|(user, member)| {
706 Some(ChannelMembership {
707 user,
708 role: member.role(),
709 kind: member.kind(),
710 })
711 })
712 .collect())
713 })
714 }
715
716 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
717 let client = self.client.clone();
718 async move {
719 client.request(proto::DeleteChannel { channel_id }).await?;
720 Ok(())
721 }
722 }
723
724 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
725 false
726 }
727
728 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
729 self.outgoing_invites.contains(&(channel_id, user_id))
730 }
731
732 async fn handle_update_channels(
733 this: Model<Self>,
734 message: TypedEnvelope<proto::UpdateChannels>,
735 _: Arc<Client>,
736 mut cx: AsyncAppContext,
737 ) -> Result<()> {
738 this.update(&mut cx, |this, _| {
739 this.update_channels_tx
740 .unbounded_send(message.payload)
741 .unwrap();
742 })?;
743 Ok(())
744 }
745
746 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
747 self.channel_index.clear();
748 self.channel_invitations.clear();
749 self.channel_participants.clear();
750 self.channel_index.clear();
751 self.outgoing_invites.clear();
752 self.disconnect_channel_buffers_task.take();
753
754 for chat in self.opened_chats.values() {
755 if let OpenedModelHandle::Open(chat) = chat {
756 if let Some(chat) = chat.upgrade() {
757 chat.update(cx, |chat, cx| {
758 chat.rejoin(cx);
759 });
760 }
761 }
762 }
763
764 let mut buffer_versions = Vec::new();
765 for buffer in self.opened_buffers.values() {
766 if let OpenedModelHandle::Open(buffer) = buffer {
767 if let Some(buffer) = buffer.upgrade() {
768 let channel_buffer = buffer.read(cx);
769 let buffer = channel_buffer.buffer().read(cx);
770 buffer_versions.push(proto::ChannelBufferVersion {
771 channel_id: channel_buffer.channel_id,
772 epoch: channel_buffer.epoch(),
773 version: language::proto::serialize_version(&buffer.version()),
774 });
775 }
776 }
777 }
778
779 if buffer_versions.is_empty() {
780 return Task::ready(Ok(()));
781 }
782
783 let response = self.client.request(proto::RejoinChannelBuffers {
784 buffers: buffer_versions,
785 });
786
787 cx.spawn(|this, mut cx| async move {
788 let mut response = response.await?;
789
790 this.update(&mut cx, |this, cx| {
791 this.opened_buffers.retain(|_, buffer| match buffer {
792 OpenedModelHandle::Open(channel_buffer) => {
793 let Some(channel_buffer) = channel_buffer.upgrade() else {
794 return false;
795 };
796
797 channel_buffer.update(cx, |channel_buffer, cx| {
798 let channel_id = channel_buffer.channel_id;
799 if let Some(remote_buffer) = response
800 .buffers
801 .iter_mut()
802 .find(|buffer| buffer.channel_id == channel_id)
803 {
804 let channel_id = channel_buffer.channel_id;
805 let remote_version =
806 language::proto::deserialize_version(&remote_buffer.version);
807
808 channel_buffer.replace_collaborators(
809 mem::take(&mut remote_buffer.collaborators),
810 cx,
811 );
812
813 let operations = channel_buffer
814 .buffer()
815 .update(cx, |buffer, cx| {
816 let outgoing_operations =
817 buffer.serialize_ops(Some(remote_version), cx);
818 let incoming_operations =
819 mem::take(&mut remote_buffer.operations)
820 .into_iter()
821 .map(language::proto::deserialize_operation)
822 .collect::<Result<Vec<_>>>()?;
823 buffer.apply_ops(incoming_operations, cx)?;
824 anyhow::Ok(outgoing_operations)
825 })
826 .log_err();
827
828 if let Some(operations) = operations {
829 let client = this.client.clone();
830 cx.background_executor()
831 .spawn(async move {
832 let operations = operations.await;
833 for chunk in
834 language::proto::split_operations(operations)
835 {
836 client
837 .send(proto::UpdateChannelBuffer {
838 channel_id,
839 operations: chunk,
840 })
841 .ok();
842 }
843 })
844 .detach();
845 return true;
846 }
847 }
848
849 channel_buffer.disconnect(cx);
850 false
851 })
852 }
853 OpenedModelHandle::Loading(_) => true,
854 });
855 })
856 .ok();
857 anyhow::Ok(())
858 })
859 }
860
861 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
862 cx.notify();
863
864 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
865 cx.spawn(move |this, mut cx| async move {
866 if wait_for_reconnect {
867 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
868 }
869
870 if let Some(this) = this.upgrade() {
871 this.update(&mut cx, |this, cx| {
872 for (_, buffer) in this.opened_buffers.drain() {
873 if let OpenedModelHandle::Open(buffer) = buffer {
874 if let Some(buffer) = buffer.upgrade() {
875 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
876 }
877 }
878 }
879 })
880 .ok();
881 }
882 })
883 });
884 }
885
886 pub(crate) fn update_channels(
887 &mut self,
888 payload: proto::UpdateChannels,
889 cx: &mut ModelContext<ChannelStore>,
890 ) -> Option<Task<Result<()>>> {
891 if !payload.remove_channel_invitations.is_empty() {
892 self.channel_invitations
893 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
894 }
895 for channel in payload.channel_invitations {
896 match self
897 .channel_invitations
898 .binary_search_by_key(&channel.id, |c| c.id)
899 {
900 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
901 Err(ix) => self.channel_invitations.insert(
902 ix,
903 Arc::new(Channel {
904 id: channel.id,
905 visibility: channel.visibility(),
906 role: channel.role(),
907 name: channel.name,
908 unseen_note_version: None,
909 unseen_message_id: None,
910 parent_path: channel.parent_path,
911 }),
912 ),
913 }
914 }
915
916 let channels_changed = !payload.channels.is_empty()
917 || !payload.delete_channels.is_empty()
918 || !payload.unseen_channel_messages.is_empty()
919 || !payload.unseen_channel_buffer_changes.is_empty();
920
921 if channels_changed {
922 if !payload.delete_channels.is_empty() {
923 self.channel_index.delete_channels(&payload.delete_channels);
924 self.channel_participants
925 .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
926
927 for channel_id in &payload.delete_channels {
928 let channel_id = *channel_id;
929 if payload
930 .channels
931 .iter()
932 .any(|channel| channel.id == channel_id)
933 {
934 continue;
935 }
936 if let Some(OpenedModelHandle::Open(buffer)) =
937 self.opened_buffers.remove(&channel_id)
938 {
939 if let Some(buffer) = buffer.upgrade() {
940 buffer.update(cx, ChannelBuffer::disconnect);
941 }
942 }
943 }
944 }
945
946 let mut index = self.channel_index.bulk_insert();
947 for channel in payload.channels {
948 let id = channel.id;
949 let channel_changed = index.insert(channel);
950
951 if channel_changed {
952 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
953 if let Some(buffer) = buffer.upgrade() {
954 buffer.update(cx, ChannelBuffer::channel_changed);
955 }
956 }
957 }
958 }
959
960 for unseen_buffer_change in payload.unseen_channel_buffer_changes {
961 let version = language::proto::deserialize_version(&unseen_buffer_change.version);
962 index.note_changed(
963 unseen_buffer_change.channel_id,
964 unseen_buffer_change.epoch,
965 &version,
966 );
967 }
968
969 for unseen_channel_message in payload.unseen_channel_messages {
970 index.new_messages(
971 unseen_channel_message.channel_id,
972 unseen_channel_message.message_id,
973 );
974 }
975 }
976
977 cx.notify();
978 if payload.channel_participants.is_empty() {
979 return None;
980 }
981
982 let mut all_user_ids = Vec::new();
983 let channel_participants = payload.channel_participants;
984 for entry in &channel_participants {
985 for user_id in entry.participant_user_ids.iter() {
986 if let Err(ix) = all_user_ids.binary_search(user_id) {
987 all_user_ids.insert(ix, *user_id);
988 }
989 }
990 }
991
992 let users = self
993 .user_store
994 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
995 Some(cx.spawn(|this, mut cx| async move {
996 let users = users.await?;
997
998 this.update(&mut cx, |this, cx| {
999 for entry in &channel_participants {
1000 let mut participants: Vec<_> = entry
1001 .participant_user_ids
1002 .iter()
1003 .filter_map(|user_id| {
1004 users
1005 .binary_search_by_key(&user_id, |user| &user.id)
1006 .ok()
1007 .map(|ix| users[ix].clone())
1008 })
1009 .collect();
1010
1011 participants.sort_by_key(|u| u.id);
1012
1013 this.channel_participants
1014 .insert(entry.channel_id, participants);
1015 }
1016
1017 cx.notify();
1018 })
1019 }))
1020 }
1021}