1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat, ChannelMessage};
4use anyhow::{anyhow, Result};
5use channel_index::ChannelIndex;
6use client::{Client, Subscription, User, UserId, UserStore};
7use collections::{hash_map, HashMap, HashSet};
8use db::RELEASE_CHANNEL;
9use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
10use gpui::{
11 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SharedString, Task,
12 WeakModel,
13};
14use rpc::{
15 proto::{self, ChannelVisibility},
16 TypedEnvelope,
17};
18use std::{mem, sync::Arc, time::Duration};
19use util::{async_maybe, ResultExt};
20
21pub fn init(client: &Arc<Client>, user_store: Model<UserStore>, cx: &mut AppContext) {
22 let channel_store =
23 cx.build_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx));
24 cx.set_global(channel_store);
25}
26
27pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
28
29pub type ChannelId = u64;
30
31pub struct ChannelStore {
32 pub channel_index: ChannelIndex,
33 channel_invitations: Vec<Arc<Channel>>,
34 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
35 outgoing_invites: HashSet<(ChannelId, UserId)>,
36 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
37 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
38 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
39 client: Arc<Client>,
40 user_store: Model<UserStore>,
41 _rpc_subscription: Subscription,
42 _watch_connection_status: Task<Option<()>>,
43 disconnect_channel_buffers_task: Option<Task<()>>,
44 _update_channels: Task<()>,
45}
46
47#[derive(Clone, Debug, PartialEq)]
48pub struct Channel {
49 pub id: ChannelId,
50 pub name: SharedString,
51 pub visibility: proto::ChannelVisibility,
52 pub role: proto::ChannelRole,
53 pub unseen_note_version: Option<(u64, clock::Global)>,
54 pub unseen_message_id: Option<u64>,
55 pub parent_path: Vec<u64>,
56}
57
58impl Channel {
59 pub fn link(&self) -> String {
60 RELEASE_CHANNEL.link_prefix().to_owned()
61 + "channel/"
62 + &self.slug()
63 + "-"
64 + &self.id.to_string()
65 }
66
67 pub fn slug(&self) -> String {
68 let slug: String = self
69 .name
70 .chars()
71 .map(|c| if c.is_alphanumeric() { c } else { '-' })
72 .collect();
73
74 slug.trim_matches(|c| c == '-').to_string()
75 }
76
77 pub fn can_edit_notes(&self) -> bool {
78 self.role == proto::ChannelRole::Member || self.role == proto::ChannelRole::Admin
79 }
80}
81
82pub struct ChannelMembership {
83 pub user: Arc<User>,
84 pub kind: proto::channel_member::Kind,
85 pub role: proto::ChannelRole,
86}
87impl ChannelMembership {
88 pub fn sort_key(&self) -> MembershipSortKey {
89 MembershipSortKey {
90 role_order: match self.role {
91 proto::ChannelRole::Admin => 0,
92 proto::ChannelRole::Member => 1,
93 proto::ChannelRole::Banned => 2,
94 proto::ChannelRole::Guest => 3,
95 },
96 kind_order: match self.kind {
97 proto::channel_member::Kind::Member => 0,
98 proto::channel_member::Kind::AncestorMember => 1,
99 proto::channel_member::Kind::Invitee => 2,
100 },
101 username_order: self.user.github_login.as_str(),
102 }
103 }
104}
105
106#[derive(PartialOrd, Ord, PartialEq, Eq)]
107pub struct MembershipSortKey<'a> {
108 role_order: u8,
109 kind_order: u8,
110 username_order: &'a str,
111}
112
113pub enum ChannelEvent {
114 ChannelCreated(ChannelId),
115 ChannelRenamed(ChannelId),
116}
117
118impl EventEmitter<ChannelEvent> for ChannelStore {}
119
120enum OpenedModelHandle<E> {
121 Open(WeakModel<E>),
122 Loading(Shared<Task<Result<Model<E>, Arc<anyhow::Error>>>>),
123}
124
125impl ChannelStore {
126 pub fn global(cx: &AppContext) -> Model<Self> {
127 cx.global::<Model<Self>>().clone()
128 }
129
130 pub fn new(
131 client: Arc<Client>,
132 user_store: Model<UserStore>,
133 cx: &mut ModelContext<Self>,
134 ) -> Self {
135 let rpc_subscription =
136 client.add_message_handler(cx.weak_model(), Self::handle_update_channels);
137
138 let mut connection_status = client.status();
139 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
140 let watch_connection_status = cx.spawn(|this, mut cx| async move {
141 while let Some(status) = connection_status.next().await {
142 let this = this.upgrade()?;
143 match status {
144 client::Status::Connected { .. } => {
145 this.update(&mut cx, |this, cx| this.handle_connect(cx))
146 .ok()?
147 .await
148 .log_err()?;
149 }
150 client::Status::SignedOut | client::Status::UpgradeRequired => {
151 this.update(&mut cx, |this, cx| this.handle_disconnect(false, cx))
152 .ok();
153 }
154 _ => {
155 this.update(&mut cx, |this, cx| this.handle_disconnect(true, cx))
156 .ok();
157 }
158 }
159 }
160 Some(())
161 });
162
163 Self {
164 channel_invitations: Vec::default(),
165 channel_index: ChannelIndex::default(),
166 channel_participants: Default::default(),
167 outgoing_invites: Default::default(),
168 opened_buffers: Default::default(),
169 opened_chats: Default::default(),
170 update_channels_tx,
171 client,
172 user_store,
173 _rpc_subscription: rpc_subscription,
174 _watch_connection_status: watch_connection_status,
175 disconnect_channel_buffers_task: None,
176 _update_channels: cx.spawn(|this, mut cx| async move {
177 async_maybe!({
178 while let Some(update_channels) = update_channels_rx.next().await {
179 if let Some(this) = this.upgrade() {
180 let update_task = this.update(&mut cx, |this, cx| {
181 this.update_channels(update_channels, cx)
182 })?;
183 if let Some(update_task) = update_task {
184 update_task.await.log_err();
185 }
186 }
187 }
188 anyhow::Ok(())
189 })
190 .await
191 .log_err();
192 }),
193 }
194 }
195
196 pub fn client(&self) -> Arc<Client> {
197 self.client.clone()
198 }
199
200 /// Returns the number of unique channels in the store
201 pub fn channel_count(&self) -> usize {
202 self.channel_index.by_id().len()
203 }
204
205 /// Returns the index of a channel ID in the list of unique channels
206 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
207 self.channel_index
208 .by_id()
209 .keys()
210 .position(|id| *id == channel_id)
211 }
212
213 /// Returns an iterator over all unique channels
214 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
215 self.channel_index.by_id().values()
216 }
217
218 /// Iterate over all entries in the channel DAG
219 pub fn ordered_channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
220 self.channel_index
221 .ordered_channels()
222 .iter()
223 .filter_map(move |id| {
224 let channel = self.channel_index.by_id().get(id)?;
225 Some((channel.parent_path.len(), channel))
226 })
227 }
228
229 pub fn channel_at_index(&self, ix: usize) -> Option<&Arc<Channel>> {
230 let channel_id = self.channel_index.ordered_channels().get(ix)?;
231 self.channel_index.by_id().get(channel_id)
232 }
233
234 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
235 self.channel_index.by_id().values().nth(ix)
236 }
237
238 pub fn has_channel_invitation(&self, channel_id: ChannelId) -> bool {
239 self.channel_invitations
240 .iter()
241 .any(|channel| channel.id == channel_id)
242 }
243
244 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
245 &self.channel_invitations
246 }
247
248 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
249 self.channel_index.by_id().get(&channel_id)
250 }
251
252 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, _cx: &AppContext) -> bool {
253 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
254 if let OpenedModelHandle::Open(buffer) = buffer {
255 return buffer.upgrade().is_some();
256 }
257 }
258 false
259 }
260
261 pub fn open_channel_buffer(
262 &mut self,
263 channel_id: ChannelId,
264 cx: &mut ModelContext<Self>,
265 ) -> Task<Result<Model<ChannelBuffer>>> {
266 let client = self.client.clone();
267 let user_store = self.user_store.clone();
268 let channel_store = cx.handle();
269 self.open_channel_resource(
270 channel_id,
271 |this| &mut this.opened_buffers,
272 |channel, cx| ChannelBuffer::new(channel, client, user_store, channel_store, cx),
273 cx,
274 )
275 }
276
277 pub fn fetch_channel_messages(
278 &self,
279 message_ids: Vec<u64>,
280 cx: &mut ModelContext<Self>,
281 ) -> Task<Result<Vec<ChannelMessage>>> {
282 let request = if message_ids.is_empty() {
283 None
284 } else {
285 Some(
286 self.client
287 .request(proto::GetChannelMessagesById { message_ids }),
288 )
289 };
290 cx.spawn(|this, mut cx| async move {
291 if let Some(request) = request {
292 let response = request.await?;
293 let this = this
294 .upgrade()
295 .ok_or_else(|| anyhow!("channel store dropped"))?;
296 let user_store = this.update(&mut cx, |this, _| this.user_store.clone())?;
297 ChannelMessage::from_proto_vec(response.messages, &user_store, &mut cx).await
298 } else {
299 Ok(Vec::new())
300 }
301 })
302 }
303
304 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
305 self.channel_index
306 .by_id()
307 .get(&channel_id)
308 .map(|channel| channel.unseen_note_version.is_some())
309 }
310
311 pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
312 self.channel_index
313 .by_id()
314 .get(&channel_id)
315 .map(|channel| channel.unseen_message_id.is_some())
316 }
317
318 pub fn notes_changed(
319 &mut self,
320 channel_id: ChannelId,
321 epoch: u64,
322 version: &clock::Global,
323 cx: &mut ModelContext<Self>,
324 ) {
325 self.channel_index.note_changed(channel_id, epoch, version);
326 cx.notify();
327 }
328
329 pub fn new_message(
330 &mut self,
331 channel_id: ChannelId,
332 message_id: u64,
333 cx: &mut ModelContext<Self>,
334 ) {
335 self.channel_index.new_message(channel_id, message_id);
336 cx.notify();
337 }
338
339 pub fn acknowledge_message_id(
340 &mut self,
341 channel_id: ChannelId,
342 message_id: u64,
343 cx: &mut ModelContext<Self>,
344 ) {
345 self.channel_index
346 .acknowledge_message_id(channel_id, message_id);
347 cx.notify();
348 }
349
350 pub fn acknowledge_notes_version(
351 &mut self,
352 channel_id: ChannelId,
353 epoch: u64,
354 version: &clock::Global,
355 cx: &mut ModelContext<Self>,
356 ) {
357 self.channel_index
358 .acknowledge_note_version(channel_id, epoch, version);
359 cx.notify();
360 }
361
362 pub fn open_channel_chat(
363 &mut self,
364 channel_id: ChannelId,
365 cx: &mut ModelContext<Self>,
366 ) -> Task<Result<Model<ChannelChat>>> {
367 let client = self.client.clone();
368 let user_store = self.user_store.clone();
369 let this = cx.handle();
370 self.open_channel_resource(
371 channel_id,
372 |this| &mut this.opened_chats,
373 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
374 cx,
375 )
376 }
377
378 /// Asynchronously open a given resource associated with a channel.
379 ///
380 /// Make sure that the resource is only opened once, even if this method
381 /// is called multiple times with the same channel id while the first task
382 /// is still running.
383 fn open_channel_resource<T, F, Fut>(
384 &mut self,
385 channel_id: ChannelId,
386 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
387 load: F,
388 cx: &mut ModelContext<Self>,
389 ) -> Task<Result<Model<T>>>
390 where
391 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
392 Fut: Future<Output = Result<Model<T>>>,
393 T: 'static,
394 {
395 let task = loop {
396 match get_map(self).entry(channel_id) {
397 hash_map::Entry::Occupied(e) => match e.get() {
398 OpenedModelHandle::Open(model) => {
399 if let Some(model) = model.upgrade() {
400 break Task::ready(Ok(model)).shared();
401 } else {
402 get_map(self).remove(&channel_id);
403 continue;
404 }
405 }
406 OpenedModelHandle::Loading(task) => {
407 break task.clone();
408 }
409 },
410 hash_map::Entry::Vacant(e) => {
411 let task = cx
412 .spawn(move |this, mut cx| async move {
413 let channel = this.update(&mut cx, |this, _| {
414 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
415 Arc::new(anyhow!("no channel for id: {}", channel_id))
416 })
417 })??;
418
419 load(channel, cx).await.map_err(Arc::new)
420 })
421 .shared();
422
423 e.insert(OpenedModelHandle::Loading(task.clone()));
424 cx.spawn({
425 let task = task.clone();
426 move |this, mut cx| async move {
427 let result = task.await;
428 this.update(&mut cx, |this, _| match result {
429 Ok(model) => {
430 get_map(this).insert(
431 channel_id,
432 OpenedModelHandle::Open(model.downgrade()),
433 );
434 }
435 Err(_) => {
436 get_map(this).remove(&channel_id);
437 }
438 })
439 .ok();
440 }
441 })
442 .detach();
443 break task;
444 }
445 }
446 };
447 cx.background_executor()
448 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
449 }
450
451 pub fn is_channel_admin(&self, channel_id: ChannelId) -> bool {
452 let Some(channel) = self.channel_for_id(channel_id) else {
453 return false;
454 };
455 channel.role == proto::ChannelRole::Admin
456 }
457
458 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
459 self.channel_participants
460 .get(&channel_id)
461 .map_or(&[], |v| v.as_slice())
462 }
463
464 pub fn create_channel(
465 &self,
466 name: &str,
467 parent_id: Option<ChannelId>,
468 cx: &mut ModelContext<Self>,
469 ) -> Task<Result<ChannelId>> {
470 let client = self.client.clone();
471 let name = name.trim_start_matches("#").to_owned();
472 cx.spawn(move |this, mut cx| async move {
473 let response = client
474 .request(proto::CreateChannel { name, parent_id })
475 .await?;
476
477 let channel = response
478 .channel
479 .ok_or_else(|| anyhow!("missing channel in response"))?;
480 let channel_id = channel.id;
481
482 this.update(&mut cx, |this, cx| {
483 let task = this.update_channels(
484 proto::UpdateChannels {
485 channels: vec![channel],
486 ..Default::default()
487 },
488 cx,
489 );
490 assert!(task.is_none());
491
492 // This event is emitted because the collab panel wants to clear the pending edit state
493 // before this frame is rendered. But we can't guarantee that the collab panel's future
494 // will resolve before this flush_effects finishes. Synchronously emitting this event
495 // ensures that the collab panel will observe this creation before the frame completes
496 cx.emit(ChannelEvent::ChannelCreated(channel_id));
497 })?;
498
499 Ok(channel_id)
500 })
501 }
502
503 pub fn move_channel(
504 &mut self,
505 channel_id: ChannelId,
506 to: Option<ChannelId>,
507 cx: &mut ModelContext<Self>,
508 ) -> Task<Result<()>> {
509 let client = self.client.clone();
510 cx.spawn(move |_, _| async move {
511 let _ = client
512 .request(proto::MoveChannel { channel_id, to })
513 .await?;
514
515 Ok(())
516 })
517 }
518
519 pub fn set_channel_visibility(
520 &mut self,
521 channel_id: ChannelId,
522 visibility: ChannelVisibility,
523 cx: &mut ModelContext<Self>,
524 ) -> Task<Result<()>> {
525 let client = self.client.clone();
526 cx.spawn(move |_, _| async move {
527 let _ = client
528 .request(proto::SetChannelVisibility {
529 channel_id,
530 visibility: visibility.into(),
531 })
532 .await?;
533
534 Ok(())
535 })
536 }
537
538 pub fn invite_member(
539 &mut self,
540 channel_id: ChannelId,
541 user_id: UserId,
542 role: proto::ChannelRole,
543 cx: &mut ModelContext<Self>,
544 ) -> Task<Result<()>> {
545 if !self.outgoing_invites.insert((channel_id, user_id)) {
546 return Task::ready(Err(anyhow!("invite request already in progress")));
547 }
548
549 cx.notify();
550 let client = self.client.clone();
551 cx.spawn(move |this, mut cx| async move {
552 let result = client
553 .request(proto::InviteChannelMember {
554 channel_id,
555 user_id,
556 role: role.into(),
557 })
558 .await;
559
560 this.update(&mut cx, |this, cx| {
561 this.outgoing_invites.remove(&(channel_id, user_id));
562 cx.notify();
563 })?;
564
565 result?;
566
567 Ok(())
568 })
569 }
570
571 pub fn remove_member(
572 &mut self,
573 channel_id: ChannelId,
574 user_id: u64,
575 cx: &mut ModelContext<Self>,
576 ) -> Task<Result<()>> {
577 if !self.outgoing_invites.insert((channel_id, user_id)) {
578 return Task::ready(Err(anyhow!("invite request already in progress")));
579 }
580
581 cx.notify();
582 let client = self.client.clone();
583 cx.spawn(move |this, mut cx| async move {
584 let result = client
585 .request(proto::RemoveChannelMember {
586 channel_id,
587 user_id,
588 })
589 .await;
590
591 this.update(&mut cx, |this, cx| {
592 this.outgoing_invites.remove(&(channel_id, user_id));
593 cx.notify();
594 })?;
595 result?;
596 Ok(())
597 })
598 }
599
600 pub fn set_member_role(
601 &mut self,
602 channel_id: ChannelId,
603 user_id: UserId,
604 role: proto::ChannelRole,
605 cx: &mut ModelContext<Self>,
606 ) -> Task<Result<()>> {
607 if !self.outgoing_invites.insert((channel_id, user_id)) {
608 return Task::ready(Err(anyhow!("member request already in progress")));
609 }
610
611 cx.notify();
612 let client = self.client.clone();
613 cx.spawn(move |this, mut cx| async move {
614 let result = client
615 .request(proto::SetChannelMemberRole {
616 channel_id,
617 user_id,
618 role: role.into(),
619 })
620 .await;
621
622 this.update(&mut cx, |this, cx| {
623 this.outgoing_invites.remove(&(channel_id, user_id));
624 cx.notify();
625 })?;
626
627 result?;
628 Ok(())
629 })
630 }
631
632 pub fn rename(
633 &mut self,
634 channel_id: ChannelId,
635 new_name: &str,
636 cx: &mut ModelContext<Self>,
637 ) -> Task<Result<()>> {
638 let client = self.client.clone();
639 let name = new_name.to_string();
640 cx.spawn(move |this, mut cx| async move {
641 let channel = client
642 .request(proto::RenameChannel { channel_id, name })
643 .await?
644 .channel
645 .ok_or_else(|| anyhow!("missing channel in response"))?;
646 this.update(&mut cx, |this, cx| {
647 let task = this.update_channels(
648 proto::UpdateChannels {
649 channels: vec![channel],
650 ..Default::default()
651 },
652 cx,
653 );
654 assert!(task.is_none());
655
656 // This event is emitted because the collab panel wants to clear the pending edit state
657 // before this frame is rendered. But we can't guarantee that the collab panel's future
658 // will resolve before this flush_effects finishes. Synchronously emitting this event
659 // ensures that the collab panel will observe this creation before the frame complete
660 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
661 })?;
662 Ok(())
663 })
664 }
665
666 pub fn respond_to_channel_invite(
667 &mut self,
668 channel_id: ChannelId,
669 accept: bool,
670 cx: &mut ModelContext<Self>,
671 ) -> Task<Result<()>> {
672 let client = self.client.clone();
673 cx.background_executor().spawn(async move {
674 client
675 .request(proto::RespondToChannelInvite { channel_id, accept })
676 .await?;
677 Ok(())
678 })
679 }
680
681 pub fn get_channel_member_details(
682 &self,
683 channel_id: ChannelId,
684 cx: &mut ModelContext<Self>,
685 ) -> Task<Result<Vec<ChannelMembership>>> {
686 let client = self.client.clone();
687 let user_store = self.user_store.downgrade();
688 cx.spawn(move |_, mut cx| async move {
689 let response = client
690 .request(proto::GetChannelMembers { channel_id })
691 .await?;
692
693 let user_ids = response.members.iter().map(|m| m.user_id).collect();
694 let user_store = user_store
695 .upgrade()
696 .ok_or_else(|| anyhow!("user store dropped"))?;
697 let users = user_store
698 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))?
699 .await?;
700
701 Ok(users
702 .into_iter()
703 .zip(response.members)
704 .filter_map(|(user, member)| {
705 Some(ChannelMembership {
706 user,
707 role: member.role(),
708 kind: member.kind(),
709 })
710 })
711 .collect())
712 })
713 }
714
715 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
716 let client = self.client.clone();
717 async move {
718 client.request(proto::DeleteChannel { channel_id }).await?;
719 Ok(())
720 }
721 }
722
723 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
724 false
725 }
726
727 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
728 self.outgoing_invites.contains(&(channel_id, user_id))
729 }
730
731 async fn handle_update_channels(
732 this: Model<Self>,
733 message: TypedEnvelope<proto::UpdateChannels>,
734 _: Arc<Client>,
735 mut cx: AsyncAppContext,
736 ) -> Result<()> {
737 this.update(&mut cx, |this, _| {
738 this.update_channels_tx
739 .unbounded_send(message.payload)
740 .unwrap();
741 })?;
742 Ok(())
743 }
744
745 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
746 self.channel_index.clear();
747 self.channel_invitations.clear();
748 self.channel_participants.clear();
749 self.channel_index.clear();
750 self.outgoing_invites.clear();
751 self.disconnect_channel_buffers_task.take();
752
753 for chat in self.opened_chats.values() {
754 if let OpenedModelHandle::Open(chat) = chat {
755 if let Some(chat) = chat.upgrade() {
756 chat.update(cx, |chat, cx| {
757 chat.rejoin(cx);
758 });
759 }
760 }
761 }
762
763 let mut buffer_versions = Vec::new();
764 for buffer in self.opened_buffers.values() {
765 if let OpenedModelHandle::Open(buffer) = buffer {
766 if let Some(buffer) = buffer.upgrade() {
767 let channel_buffer = buffer.read(cx);
768 let buffer = channel_buffer.buffer().read(cx);
769 buffer_versions.push(proto::ChannelBufferVersion {
770 channel_id: channel_buffer.channel_id,
771 epoch: channel_buffer.epoch(),
772 version: language::proto::serialize_version(&buffer.version()),
773 });
774 }
775 }
776 }
777
778 if buffer_versions.is_empty() {
779 return Task::ready(Ok(()));
780 }
781
782 let response = self.client.request(proto::RejoinChannelBuffers {
783 buffers: buffer_versions,
784 });
785
786 cx.spawn(|this, mut cx| async move {
787 let mut response = response.await?;
788
789 this.update(&mut cx, |this, cx| {
790 this.opened_buffers.retain(|_, buffer| match buffer {
791 OpenedModelHandle::Open(channel_buffer) => {
792 let Some(channel_buffer) = channel_buffer.upgrade() else {
793 return false;
794 };
795
796 channel_buffer.update(cx, |channel_buffer, cx| {
797 let channel_id = channel_buffer.channel_id;
798 if let Some(remote_buffer) = response
799 .buffers
800 .iter_mut()
801 .find(|buffer| buffer.channel_id == channel_id)
802 {
803 let channel_id = channel_buffer.channel_id;
804 let remote_version =
805 language::proto::deserialize_version(&remote_buffer.version);
806
807 channel_buffer.replace_collaborators(
808 mem::take(&mut remote_buffer.collaborators),
809 cx,
810 );
811
812 let operations = channel_buffer
813 .buffer()
814 .update(cx, |buffer, cx| {
815 let outgoing_operations =
816 buffer.serialize_ops(Some(remote_version), cx);
817 let incoming_operations =
818 mem::take(&mut remote_buffer.operations)
819 .into_iter()
820 .map(language::proto::deserialize_operation)
821 .collect::<Result<Vec<_>>>()?;
822 buffer.apply_ops(incoming_operations, cx)?;
823 anyhow::Ok(outgoing_operations)
824 })
825 .log_err();
826
827 if let Some(operations) = operations {
828 let client = this.client.clone();
829 cx.background_executor()
830 .spawn(async move {
831 let operations = operations.await;
832 for chunk in
833 language::proto::split_operations(operations)
834 {
835 client
836 .send(proto::UpdateChannelBuffer {
837 channel_id,
838 operations: chunk,
839 })
840 .ok();
841 }
842 })
843 .detach();
844 return true;
845 }
846 }
847
848 channel_buffer.disconnect(cx);
849 false
850 })
851 }
852 OpenedModelHandle::Loading(_) => true,
853 });
854 })
855 .ok();
856 anyhow::Ok(())
857 })
858 }
859
860 fn handle_disconnect(&mut self, wait_for_reconnect: bool, cx: &mut ModelContext<Self>) {
861 cx.notify();
862
863 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
864 cx.spawn(move |this, mut cx| async move {
865 if wait_for_reconnect {
866 cx.background_executor().timer(RECONNECT_TIMEOUT).await;
867 }
868
869 if let Some(this) = this.upgrade() {
870 this.update(&mut cx, |this, cx| {
871 for (_, buffer) in this.opened_buffers.drain() {
872 if let OpenedModelHandle::Open(buffer) = buffer {
873 if let Some(buffer) = buffer.upgrade() {
874 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
875 }
876 }
877 }
878 })
879 .ok();
880 }
881 })
882 });
883 }
884
885 pub(crate) fn update_channels(
886 &mut self,
887 payload: proto::UpdateChannels,
888 cx: &mut ModelContext<ChannelStore>,
889 ) -> Option<Task<Result<()>>> {
890 if !payload.remove_channel_invitations.is_empty() {
891 self.channel_invitations
892 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
893 }
894 for channel in payload.channel_invitations {
895 match self
896 .channel_invitations
897 .binary_search_by_key(&channel.id, |c| c.id)
898 {
899 Ok(ix) => {
900 Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name.into()
901 }
902 Err(ix) => self.channel_invitations.insert(
903 ix,
904 Arc::new(Channel {
905 id: channel.id,
906 visibility: channel.visibility(),
907 role: channel.role(),
908 name: channel.name.into(),
909 unseen_note_version: None,
910 unseen_message_id: None,
911 parent_path: channel.parent_path,
912 }),
913 ),
914 }
915 }
916
917 let channels_changed = !payload.channels.is_empty()
918 || !payload.delete_channels.is_empty()
919 || !payload.unseen_channel_messages.is_empty()
920 || !payload.unseen_channel_buffer_changes.is_empty();
921
922 if channels_changed {
923 if !payload.delete_channels.is_empty() {
924 self.channel_index.delete_channels(&payload.delete_channels);
925 self.channel_participants
926 .retain(|channel_id, _| !&payload.delete_channels.contains(channel_id));
927
928 for channel_id in &payload.delete_channels {
929 let channel_id = *channel_id;
930 if payload
931 .channels
932 .iter()
933 .any(|channel| channel.id == channel_id)
934 {
935 continue;
936 }
937 if let Some(OpenedModelHandle::Open(buffer)) =
938 self.opened_buffers.remove(&channel_id)
939 {
940 if let Some(buffer) = buffer.upgrade() {
941 buffer.update(cx, ChannelBuffer::disconnect);
942 }
943 }
944 }
945 }
946
947 let mut index = self.channel_index.bulk_insert();
948 for channel in payload.channels {
949 let id = channel.id;
950 let channel_changed = index.insert(channel);
951
952 if channel_changed {
953 if let Some(OpenedModelHandle::Open(buffer)) = self.opened_buffers.get(&id) {
954 if let Some(buffer) = buffer.upgrade() {
955 buffer.update(cx, ChannelBuffer::channel_changed);
956 }
957 }
958 }
959 }
960
961 for unseen_buffer_change in payload.unseen_channel_buffer_changes {
962 let version = language::proto::deserialize_version(&unseen_buffer_change.version);
963 index.note_changed(
964 unseen_buffer_change.channel_id,
965 unseen_buffer_change.epoch,
966 &version,
967 );
968 }
969
970 for unseen_channel_message in payload.unseen_channel_messages {
971 index.new_messages(
972 unseen_channel_message.channel_id,
973 unseen_channel_message.message_id,
974 );
975 }
976 }
977
978 cx.notify();
979 if payload.channel_participants.is_empty() {
980 return None;
981 }
982
983 let mut all_user_ids = Vec::new();
984 let channel_participants = payload.channel_participants;
985 for entry in &channel_participants {
986 for user_id in entry.participant_user_ids.iter() {
987 if let Err(ix) = all_user_ids.binary_search(user_id) {
988 all_user_ids.insert(ix, *user_id);
989 }
990 }
991 }
992
993 let users = self
994 .user_store
995 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
996 Some(cx.spawn(|this, mut cx| async move {
997 let users = users.await?;
998
999 this.update(&mut cx, |this, cx| {
1000 for entry in &channel_participants {
1001 let mut participants: Vec<_> = entry
1002 .participant_user_ids
1003 .iter()
1004 .filter_map(|user_id| {
1005 users
1006 .binary_search_by_key(&user_id, |user| &user.id)
1007 .ok()
1008 .map(|ix| users[ix].clone())
1009 })
1010 .collect();
1011
1012 participants.sort_by_key(|u| u.id);
1013
1014 this.channel_participants
1015 .insert(entry.channel_id, participants);
1016 }
1017
1018 cx.notify();
1019 })
1020 }))
1021 }
1022}