1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use db::RELEASE_CHANNEL;
9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
10use gpui::{
11 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SharedString, Task,
12 WeakModel,
13};
14use language::Capability;
15use rpc::{
16 proto::{self, ChannelVisibility},
17 TypedEnvelope,
18};
19use std::{mem, sync::Arc, time::Duration};
20use util::{async_maybe, ResultExt};
21
22pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
23 let channel_store =
24 cx.new_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
25 cx.set_global(channel_store);
26}
27
28pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
29
30pub type ChannelId = u64;
31
32pub struct ChannelStore {
33 pub channel_index: ChannelIndex,
34 channel_invitations: Vec<Arc<Channel>>,
35 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
36 outgoing_invites: HashSet<(ChannelId, UserId)>,
37 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
38 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
39 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
40 client: Arc<Client>,
41 user_store: Model<UserStore>,
42 _rpc_subscription: Subscription,
43 _watch_connection_status: Task<Option<()>>,
44 disconnect_channel_buffers_task: Option<Task<()>>,
45 _update_channels: Task<()>,
46}
47
48#[derive(Clone, Debug, PartialEq)]
49pub struct Channel {
50 pub id: ChannelId,
51 pub name: SharedString,
52 pub visibility: proto::ChannelVisibility,
53 pub role: proto::ChannelRole,
54 pub unseen_note_version: Option<(u64, clock::Global)>,
55 pub unseen_message_id: Option<u64>,
56 pub parent_path: Vec<u64>,
57}
58
59impl Channel {
60 pub fn link(&self) -> String {
61 RELEASE_CHANNEL.link_prefix().to_owned()
62 + "channel/"
63 + &self.slug()
64 + "-"
65 + &self.id.to_string()
66 }
67
68 pub fn slug(&self) -> String {
69 let slug: String = self
70 .name
71 .chars()
72 .map(|c| if c.is_alphanumeric() { c } else { '-' })
73 .collect();
74
75 slug.trim_matches(|c| c == '-').to_string()
76 }
77
78 pub fn channel_buffer_capability(&self) -> Capability {
79 if self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin {
80 Capability::ReadWrite
81 } else {
82 Capability::ReadOnly
83 }
84 }
85}
86
87pub struct ChannelMembership {
88 pub user: Arc<User>,
89 pub kind: proto::channel_member::Kind,
90 pub role: proto::ChannelRole,
91}
92impl ChannelMembership {
93 pub fn sort_key(&self) -> MembershipSortKey {
94 MembershipSortKey {
95 role_order: match self.role {
96 proto::ChannelRole::Admin => 0,
97 proto::ChannelRole::Member => 1,
98 proto::ChannelRole::Banned => 2,
99 proto::ChannelRole::Guest => 3,
100 },
101 kind_order: match self.kind {
102 proto::channel_member::Kind::Member => 0,
103 proto::channel_member::Kind::AncestorMember => 1,
104 proto::channel_member::Kind::Invitee => 2,
105 },
106 username_order: self.user.github_login.as_str(),
107 }
108 }
109}
110
111#[derive(PartialOrd, Ord, PartialEq, Eq)]
112pub struct MembershipSortKey<'a> {
113 role_order: u8,
114 kind_order: u8,
115 username_order: &'a str,
116}
117
118pub enum ChannelEvent {
119 ChannelCreated(ChannelId),
120 ChannelRenamed(ChannelId),
121}
122
123impl EventEmitter<ChannelEvent> for ChannelStore {}
124
125enum OpenedModelHandle<E> {
126 Open(WeakModel<E>),
127 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
128}
129
130impl ChannelStore {
131 pub fn global(cx: &AppContext) -> Model<Self> {
132 cx.global::<Model<Self>>().clone()
133 }
134
135 pub fn new(
136 client: Arc<Client>,
137 user_store: Model<UserStore>,
138 cx: &mut ModelContext<Self>,
139 ) -> Self {
140 let rpc_subscription =
141 client.add_message_handler(cx.weak_model(), Self::handle_update_channels);
142
143 let mut connection_status = client.status();
144 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
145 let watch_connection_status = cx.spawn(|this, mut cx| async move {
146 while let Some(status) = connection_status.next().await {
147 let this = this.upgrade()?;
148 match status {
149 client::Status::Connected { .. } => {
150 this.update(&mut cx, |this, cx| this.handle_connect(cx))
151 .ok()?
152 .await
153 .log_err()?;
154 }
155 client::Status::SignedOut | client::Status::UpgradeRequired => {
156 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
157 .ok();
158 }
159 _ => {
160 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
161 .ok();
162 }
163 }
164 }
165 Some(())
166 });
167
168 Self {
169 channel_invitations: Vec::default(),
170 channel_index: ChannelIndex::default(),
171 channel_participants: Default::default(),
172 outgoing_invites: Default::default(),
173 opened_buffers: Default::default(),
174 opened_chats: Default::default(),
175 update_channels_tx,
176 client,
177 user_store,
178 _rpc_subscription: rpc_subscription,
179 _watch_connection_status: watch_connection_status,
180 disconnect_channel_buffers_task: None,
181 _update_channels: cx.spawn(|this, mut cx| async move {
182 async_maybe!({
183 while let Some(update_channels) = update_channels_rx.next().await {
184 if let Some(this) = this.upgrade() {
185 let update_task = this.update(&mut cx, |this, cx| {
186 this.update_channels(update_channels, cx)
187 })?;
188 if let Some(update_task) = update_task {
189 update_task.await.log_err();
190 }
191 }
192 }
193 anyhow::Ok(())
194 })
195 .await
196 .log_err();
197 }),
198 }
199 }
200
201 pub fn client(&self) -> Arc<Client> {
202 self.client.clone()
203 }
204
205 /// Returns the number of unique channels in the store
206 pub fn channel_count(&self) -> usize {
207 self.channel_index.by_id().len()
208 }
209
210 /// Returns the index of a channel ID in the list of unique channels
211 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
212 self.channel_index
213 .by_id()
214 .keys()
215 .position(|id| *id == channel_id)
216 }
217
218 /// Returns an iterator over all unique channels
219 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
220 self.channel_index.by_id().values()
221 }
222
223 /// Iterate over all entries in the channel DAG
224 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
225 self.channel_index
226 .ordered_channels()
227 .iter()
228 .filter_map(move |id| {
229 let channel = self.channel_index.by_id().get(id)?;
230 Some((channel.parent_path.len(), channel))
231 })
232 }
233
234 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
235 let channel_id = self.channel_index.ordered_channels().get(ix)?;
236 self.channel_index.by_id().get(channel_id)
237 }
238
239 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
240 self.channel_index.by_id().values().nth(ix)
241 }
242
243 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
244 self.channel_invitations
245 .iter()
246 .any(|channel| channel.id == channel_id)
247 }
248
249 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
250 &self.channel_invitations
251 }
252
253 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
254 self.channel_index.by_id().get(&channel_id)
255 }
256
257 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
258 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
259 if let OpenedModelHandle::Open(buffer) = buffer {
260 return buffer.upgrade().is_some();
261 }
262 }
263 false
264 }
265
266 pub fn open_channel_buffer(
267 &mut self,
268 channel_id: ChannelId,
269 cx: &mut ModelContext<Self>,
270 ) -> Task<Result<Model<ChannelBuffer>>> {
271 let client = self.client.clone();
272 let user_store = self.user_store.clone();
273 let channel_store = cx.handle();
274 self.open_channel_resource(
275 channel_id,
276 |this| &mut this.opened_buffers,
277 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
278 cx,
279 )
280 }
281
282 pub fn fetch_channel_messages(
283 &self,
284 message_ids: Vec<u64>,
285 cx: &mut ModelContext<Self>,
286 ) -> Task<Result<Vec<ChannelMessage>>> {
287 let request = if message_ids.is_empty() {
288 None
289 } else {
290 Some(
291 self.client
292 .request(proto::GetChannelMessagesById { message_ids }),
293 )
294 };
295 cx.spawn(|this, mut cx| async move {
296 if let Some(request) = request {
297 let response = request.await?;
298 let this = this
299 .upgrade()
300 .ok_or_else(|| anyhow!("channel store dropped"))?;
301 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
302 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
303 } else {
304 Ok(Vec::new())
305 }
306 })
307 }
308
309 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
310 self.channel_index
311 .by_id()
312 .get(&channel_id)
313 .map(|channel| channel.unseen_note_version.is_some())
314 }
315
316 pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
317 self.channel_index
318 .by_id()
319 .get(&channel_id)
320 .map(|channel| channel.unseen_message_id.is_some())
321 }
322
323 pub fn notes_changed(
324 &mut self,
325 channel_id: ChannelId,
326 epoch: u64,
327 version: &clock::Global,
328 cx: &mut ModelContext<Self>,
329 ) {
330 self.channel_index.note_changed(channel_id, epoch, version);
331 cx.notify();
332 }
333
334 pub fn new_message(
335 &mut self,
336 channel_id: ChannelId,
337 message_id: u64,
338 cx: &mut ModelContext<Self>,
339 ) {
340 self.channel_index.new_message(channel_id, message_id);
341 cx.notify();
342 }
343
344 pub fn acknowledge_message_id(
345 &mut self,
346 channel_id: ChannelId,
347 message_id: u64,
348 cx: &mut ModelContext<Self>,
349 ) {
350 self.channel_index
351 .acknowledge_message_id(channel_id, message_id);
352 cx.notify();
353 }
354
355 pub fn acknowledge_notes_version(
356 &mut self,
357 channel_id: ChannelId,
358 epoch: u64,
359 version: &clock::Global,
360 cx: &mut ModelContext<Self>,
361 ) {
362 self.channel_index
363 .acknowledge_note_version(channel_id, epoch, version);
364 cx.notify();
365 }
366
367 pub fn open_channel_chat(
368 &mut self,
369 channel_id: ChannelId,
370 cx: &mut ModelContext<Self>,
371 ) -> Task<Result<Model<ChannelChat>>> {
372 let client = self.client.clone();
373 let user_store = self.user_store.clone();
374 let this = cx.handle();
375 self.open_channel_resource(
376 channel_id,
377 |this| &mut this.opened_chats,
378 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
379 cx,
380 )
381 }
382
383 /// Asynchronously open a given resource associated with a channel.
384 ///
385 /// Make sure that the resource is only opened once, even if this method
386 /// is called multiple times with the same channel id while the first task
387 /// is still running.
388 fn open_channel_resource<T, F, Fut>(
389 &mut self,
390 channel_id: ChannelId,
391 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
392 load: F,
393 cx: &mut ModelContext<Self>,
394 ) -> Task<Result<Model<T>>>
395 where
396 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
397 Fut: Future<Output = Result<Model<T>>>,
398 T: 'static,
399 {
400 let task = loop {
401 match get_map(self).entry(channel_id) {
402 hash_map::Entry::Occupied(e) => match e.get() {
403 OpenedModelHandle::Open(model) => {
404 if let Some(model) = model.upgrade() {
405 break Task::ready(Ok(model)).shared();
406 } else {
407 get_map(self).remove(&channel_id);
408 continue;
409 }
410 }
411 OpenedModelHandle::Loading(task) => {
412 break task.clone();
413 }
414 },
415 hash_map::Entry::Vacant(e) => {
416 let task = cx
417 .spawn(move |this, mut cx| async move {
418 let channel = this.update(&mut cx, |this, _| {
419 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
420 Arc::new(anyhow!("no channel for id: {}", channel_id))
421 })
422 })??;
423
424 load(channel, cx).await.map_err(Arc::new)
425 })
426 .shared();
427
428 e.insert(OpenedModelHandle::Loading(task.clone()));
429 cx.spawn({
430 let task = task.clone();
431 move |this, mut cx| async move {
432 let result = task.await;
433 this.update(&mut cx, |this, _| match result {
434 Ok(model) => {
435 get_map(this).insert(
436 channel_id,
437 OpenedModelHandle::Open(model.downgrade()),
438 );
439 }
440 Err(_) => {
441 get_map(this).remove(&channel_id);
442 }
443 })
444 .ok();
445 }
446 })
447 .detach();
448 break task;
449 }
450 }
451 };
452 cx.background_executor()
453 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
454 }
455
456 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
457 let Some(channel) = self.channel_for_id(channel_id) else {
458 return false;
459 };
460 channel.role == proto::ChannelRole::Admin
461 }
462
463 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
464 self.channel_participants
465 .get(&channel_id)
466 .map_or(&[], |v| v.as_slice())
467 }
468
469 pub fn create_channel(
470 &self,
471 name: &str,
472 parent_id: Option<ChannelId>,
473 cx: &mut ModelContext<Self>,
474 ) -> Task<Result<ChannelId>> {
475 let client = self.client.clone();
476 let name = name.trim_start_matches("#").to_owned();
477 cx.spawn(move |this, mut cx| async move {
478 let response = client
479 .request(proto::CreateChannel { name, parent_id })
480 .await?;
481
482 let channel = response
483 .channel
484 .ok_or_else(|| anyhow!("missing channel in response"))?;
485 let channel_id = channel.id;
486
487 this.update(&mut cx, |this, cx| {
488 let task = this.update_channels(
489 proto::UpdateChannels {
490 channels: vec![channel],
491 ..Default::default()
492 },
493 cx,
494 );
495 assert!(task.is_none());
496
497 // This event is emitted because the collab panel wants to clear the pending edit state
498 // before this frame is rendered. But we can't guarantee that the collab panel's future
499 // will resolve before this flush_effects finishes. Synchronously emitting this event
500 // ensures that the collab panel will observe this creation before the frame completes
501 cx.emit(ChannelEvent::ChannelCreated(channel_id));
502 })?;
503
504 Ok(channel_id)
505 })
506 }
507
508 pub fn move_channel(
509 &mut self,
510 channel_id: ChannelId,
511 to: Option<ChannelId>,
512 cx: &mut ModelContext<Self>,
513 ) -> Task<Result<()>> {
514 let client = self.client.clone();
515 cx.spawn(move |_, _| async move {
516 let _ = client
517 .request(proto::MoveChannel { channel_id, to })
518 .await?;
519
520 Ok(())
521 })
522 }
523
524 pub fn set_channel_visibility(
525 &mut self,
526 channel_id: ChannelId,
527 visibility: ChannelVisibility,
528 cx: &mut ModelContext<Self>,
529 ) -> Task<Result<()>> {
530 let client = self.client.clone();
531 cx.spawn(move |_, _| async move {
532 let _ = client
533 .request(proto::SetChannelVisibility {
534 channel_id,
535 visibility: visibility.into(),
536 })
537 .await?;
538
539 Ok(())
540 })
541 }
542
543 pub fn invite_member(
544 &mut self,
545 channel_id: ChannelId,
546 user_id: UserId,
547 role: proto::ChannelRole,
548 cx: &mut ModelContext<Self>,
549 ) -> Task<Result<()>> {
550 if !self.outgoing_invites.insert((channel_id, user_id)) {
551 return Task::ready(Err(anyhow!("invite request already in progress")));
552 }
553
554 cx.notify();
555 let client = self.client.clone();
556 cx.spawn(move |this, mut cx| async move {
557 let result = client
558 .request(proto::InviteChannelMember {
559 channel_id,
560 user_id,
561 role: role.into(),
562 })
563 .await;
564
565 this.update(&mut cx, |this, cx| {
566 this.outgoing_invites.remove(&(channel_id, user_id));
567 cx.notify();
568 })?;
569
570 result?;
571
572 Ok(())
573 })
574 }
575
576 pub fn remove_member(
577 &mut self,
578 channel_id: ChannelId,
579 user_id: u64,
580 cx: &mut ModelContext<Self>,
581 ) -> Task<Result<()>> {
582 if !self.outgoing_invites.insert((channel_id, user_id)) {
583 return Task::ready(Err(anyhow!("invite request already in progress")));
584 }
585
586 cx.notify();
587 let client = self.client.clone();
588 cx.spawn(move |this, mut cx| async move {
589 let result = client
590 .request(proto::RemoveChannelMember {
591 channel_id,
592 user_id,
593 })
594 .await;
595
596 this.update(&mut cx, |this, cx| {
597 this.outgoing_invites.remove(&(channel_id, user_id));
598 cx.notify();
599 })?;
600 result?;
601 Ok(())
602 })
603 }
604
605 pub fn set_member_role(
606 &mut self,
607 channel_id: ChannelId,
608 user_id: UserId,
609 role: proto::ChannelRole,
610 cx: &mut ModelContext<Self>,
611 ) -> Task<Result<()>> {
612 if !self.outgoing_invites.insert((channel_id, user_id)) {
613 return Task::ready(Err(anyhow!("member request already in progress")));
614 }
615
616 cx.notify();
617 let client = self.client.clone();
618 cx.spawn(move |this, mut cx| async move {
619 let result = client
620 .request(proto::SetChannelMemberRole {
621 channel_id,
622 user_id,
623 role: role.into(),
624 })
625 .await;
626
627 this.update(&mut cx, |this, cx| {
628 this.outgoing_invites.remove(&(channel_id, user_id));
629 cx.notify();
630 })?;
631
632 result?;
633 Ok(())
634 })
635 }
636
637 pub fn rename(
638 &mut self,
639 channel_id: ChannelId,
640 new_name: &str,
641 cx: &mut ModelContext<Self>,
642 ) -> Task<Result<()>> {
643 let client = self.client.clone();
644 let name = new_name.to_string();
645 cx.spawn(move |this, mut cx| async move {
646 let channel = client
647 .request(proto::RenameChannel { channel_id, name })
648 .await?
649 .channel
650 .ok_or_else(|| anyhow!("missing channel in response"))?;
651 this.update(&mut cx, |this, cx| {
652 let task = this.update_channels(
653 proto::UpdateChannels {
654 channels: vec![channel],
655 ..Default::default()
656 },
657 cx,
658 );
659 assert!(task.is_none());
660
661 // This event is emitted because the collab panel wants to clear the pending edit state
662 // before this frame is rendered. But we can't guarantee that the collab panel's future
663 // will resolve before this flush_effects finishes. Synchronously emitting this event
664 // ensures that the collab panel will observe this creation before the frame complete
665 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
666 })?;
667 Ok(())
668 })
669 }
670
671 pub fn respond_to_channel_invite(
672 &mut self,
673 channel_id: ChannelId,
674 accept: bool,
675 cx: &mut ModelContext<Self>,
676 ) -> Task<Result<()>> {
677 let client = self.client.clone();
678 cx.background_executor().spawn(async move {
679 client
680 .request(proto::RespondToChannelInvite { channel_id, accept })
681 .await?;
682 Ok(())
683 })
684 }
685
686 pub fn get_channel_member_details(
687 &self,
688 channel_id: ChannelId,
689 cx: &mut ModelContext<Self>,
690 ) -> Task<Result<Vec<ChannelMembership>>> {
691 let client = self.client.clone();
692 let user_store = self.user_store.downgrade();
693 cx.spawn(move |_, mut cx| async move {
694 let response = client
695 .request(proto::GetChannelMembers { channel_id })
696 .await?;
697
698 let user_ids = response.members.iter().map(|m| m.user_id).collect();
699 let user_store = user_store
700 .upgrade()
701 .ok_or_else(|| anyhow!("user store dropped"))?;
702 let users = user_store
703 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
704 .await?;
705
706 Ok(users
707 .into_iter()
708 .zip(response.members)
709 .filter_map(|(user, member)| {
710 Some(ChannelMembership {
711 user,
712 role: member.role(),
713 kind: member.kind(),
714 })
715 })
716 .collect())
717 })
718 }
719
720 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
721 let client = self.client.clone();
722 async move {
723 client.request(proto::DeleteChannel { channel_id }).await?;
724 Ok(())
725 }
726 }
727
728 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
729 false
730 }
731
732 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
733 self.outgoing_invites.contains(&(channel_id, user_id))
734 }
735
736 async fn handle_update_channels(
737 this: Model<Self>,
738 message: TypedEnvelope<proto::UpdateChannels>,
739 _: Arc<Client>,
740 mut cx: AsyncAppContext,
741 ) -> Result<()> {
742 this.update(&mut cx, |this, _| {
743 this.update_channels_tx
744 .unbounded_send(message.payload)
745 .unwrap();
746 })?;
747 Ok(())
748 }
749
750 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
751 self.channel_index.clear();
752 self.channel_invitations.clear();
753 self.channel_participants.clear();
754 self.channel_index.clear();
755 self.outgoing_invites.clear();
756 self.disconnect_channel_buffers_task.take();
757
758 for chat in self.opened_chats.values() {
759 if let OpenedModelHandle::Open(chat) = chat {
760 if let Some(chat) = chat.upgrade() {
761 chat.update(cx, |chat, cx| {
762 chat.rejoin(cx);
763 });
764 }
765 }
766 }
767
768 let mut buffer_versions = Vec::new();
769 for buffer in self.opened_buffers.values() {
770 if let OpenedModelHandle::Open(buffer) = buffer {
771 if let Some(buffer) = buffer.upgrade() {
772 let channel_buffer = buffer.read(cx);
773 let buffer = channel_buffer.buffer().read(cx);
774 buffer_versions.push(proto::ChannelBufferVersion {
775 channel_id: channel_buffer.channel_id,
776 epoch: channel_buffer.epoch(),
777 version: language::proto::serialize_version(&buffer.version()),
778 });
779 }
780 }
781 }
782
783 if buffer_versions.is_empty() {
784 return Task::ready(Ok(()));
785 }
786
787 let response = self.client.request(proto::RejoinChannelBuffers {
788 buffers: buffer_versions,
789 });
790
791 cx.spawn(|this, mut cx| async move {
792 let mut response = response.await?;
793
794 this.update(&mut cx, |this, cx| {
795 this.opened_buffers.retain(|_, buffer| match buffer {
796 OpenedModelHandle::Open(channel_buffer) => {
797 let Some(channel_buffer) = channel_buffer.upgrade() else {
798 return false;
799 };
800
801 channel_buffer.update(cx, |channel_buffer, cx| {
802 let channel_id = channel_buffer.channel_id;
803 if let Some(remote_buffer) = response
804 .buffers
805 .iter_mut()
806 .find(|buffer| buffer.channel_id == channel_id)
807 {
808 let channel_id = channel_buffer.channel_id;
809 let remote_version =
810 language::proto::deserialize_version(&remote_buffer.version);
811
812 channel_buffer.replace_collaborators(
813 mem::take(&mut remote_buffer.collaborators),
814 cx,
815 );
816
817 let operations = channel_buffer
818 .buffer()
819 .update(cx, |buffer, cx| {
820 let outgoing_operations =
821 buffer.serialize_ops(Some(remote_version), cx);
822 let incoming_operations =
823 mem::take(&mut remote_buffer.operations)
824 .into_iter()
825 .map(language::proto::deserialize_operation)
826 .collect::<Result<Vec<_>>>()?;
827 buffer.apply_ops(incoming_operations, cx)?;
828 anyhow::Ok(outgoing_operations)
829 })
830 .log_err();
831
832 if let Some(operations) = operations {
833 let client = this.client.clone();
834 cx.background_executor()
835 .spawn(async move {
836 let operations = operations.await;
837 for chunk in
838 language::proto::split_operations(operations)
839 {
840 client
841 .send(proto::UpdateChannelBuffer {
842 channel_id,
843 operations: chunk,
844 })
845 .ok();
846 }
847 })
848 .detach();
849 return true;
850 }
851 }
852
853 channel_buffer.disconnect(cx);
854 false
855 })
856 }
857 OpenedModelHandle::Loading(_) => true,
858 });
859 })
860 .ok();
861 anyhow::Ok(())
862 })
863 }
864
865 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
866 cx.notify();
867
868 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
869 cx.spawn(move |this, mut cx| async move {
870 if wait_for_reconnect {
871 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
872 }
873
874 if let Some(this) = this.upgrade() {
875 this.update(&mut cx, |this, cx| {
876 for (_, buffer) in this.opened_buffers.drain() {
877 if let OpenedModelHandle::Open(buffer) = buffer {
878 if let Some(buffer) = buffer.upgrade() {
879 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
880 }
881 }
882 }
883 })
884 .ok();
885 }
886 })
887 });
888 }
889
890 pub(crate) fn update_channels(
891 &mut self,
892 payload: proto::UpdateChannels,
893 cx: &mut ModelContext<ChannelStore>,
894 ) -> Option<Task<Result<()>>> {
895 if !payload.remove_channel_invitations.is_empty() {
896 self.channel_invitations
897 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
898 }
899 for channel in payload.channel_invitations {
900 match self
901 .channel_invitations
902 .binary_search_by_key(&channel.id, |c| c.id)
903 {
904 Ok(ix) => {
905 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
906 }
907 Err(ix) => self.channel_invitations.insert(
908 ix,
909 Arc::new(Channel {
910 id: channel.id,
911 visibility: channel.visibility(),
912 role: channel.role(),
913 name: channel.name.into(),
914 unseen_note_version: None,
915 unseen_message_id: None,
916 parent_path: channel.parent_path,
917 }),
918 ),
919 }
920 }
921
922 let channels_changed = !payload.channels.is_empty()
923 || !payload.delete_channels.is_empty()
924 || !payload.unseen_channel_messages.is_empty()
925 || !payload.unseen_channel_buffer_changes.is_empty();
926
927 if channels_changed {
928 if !payload.delete_channels.is_empty() {
929 self.channel_index.delete_channels(&payload.delete_channels);
930 self.channel_participants
931 .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
932
933 for channel_id in &payload.delete_channels {
934 let channel_id = *channel_id;
935 if payload
936 .channels
937 .iter()
938 .any(|channel| channel.id == channel_id)
939 {
940 continue;
941 }
942 if let Some(OpenedModelHandle::Open(buffer)) =
943 self.opened_buffers.remove(&channel_id)
944 {
945 if let Some(buffer) = buffer.upgrade() {
946 buffer.update(cx, ChannelBuffer::disconnect);
947 }
948 }
949 }
950 }
951
952 let mut index = self.channel_index.bulk_insert();
953 for channel in payload.channels {
954 let id = channel.id;
955 let channel_changed = index.insert(channel);
956
957 if channel_changed {
958 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
959 if let Some(buffer) = buffer.upgrade() {
960 buffer.update(cx, ChannelBuffer::channel_changed);
961 }
962 }
963 }
964 }
965
966 for unseen_buffer_change in payload.unseen_channel_buffer_changes {
967 let version = language::proto::deserialize_version(&unseen_buffer_change.version);
968 index.note_changed(
969 unseen_buffer_change.channel_id,
970 unseen_buffer_change.epoch,
971 &version,
972 );
973 }
974
975 for unseen_channel_message in payload.unseen_channel_messages {
976 index.new_messages(
977 unseen_channel_message.channel_id,
978 unseen_channel_message.message_id,
979 );
980 }
981 }
982
983 cx.notify();
984 if payload.channel_participants.is_empty() {
985 return None;
986 }
987
988 let mut all_user_ids = Vec::new();
989 let channel_participants = payload.channel_participants;
990 for entry in &channel_participants {
991 for user_id in entry.participant_user_ids.iter() {
992 if let Err(ix) = all_user_ids.binary_search(user_id) {
993 all_user_ids.insert(ix, *user_id);
994 }
995 }
996 }
997
998 let users = self
999 .user_store
1000 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
1001 Some(cx.spawn(|this, mut cx| async move {
1002 let users = users.await?;
1003
1004 this.update(&mut cx, |this, cx| {
1005 for entry in &channel_participants {
1006 let mut participants: Vec<_> = entry
1007 .participant_user_ids
1008 .iter()
1009 .filter_map(|user_id| {
1010 users
1011 .binary_search_by_key(&user_id, |user| &user.id)
1012 .ok()
1013 .map(|ix| users[ix].clone())
1014 })
1015 .collect();
1016
1017 participants.sort_by_key(|u| u.id);
1018
1019 this.channel_participants
1020 .insert(entry.channel_id, participants);
1021 }
1022
1023 cx.notify();
1024 })
1025 }))
1026 }
1027}