1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
4use anyhow::{anyhow, Result};
5use client::{Client, Subscription, User, UserId, UserStore};
6use collections::{hash_map, HashMap, HashSet};
7use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
8use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
9use rpc::{
10 proto::{self, ChannelEdge, ChannelPermission},
11 TypedEnvelope,
12};
13use serde_derive::{Deserialize, Serialize};
14use std::{borrow::Cow, hash::Hash, mem, ops::Deref, sync::Arc, time::Duration};
15use util::ResultExt;
16
17use self::channel_index::ChannelIndex;
18
19pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
20
21pub type ChannelId = u64;
22
23pub struct ChannelStore {
24 channel_index: ChannelIndex,
25 channel_invitations: Vec<Arc<Channel>>,
26 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
27 channels_with_admin_privileges: HashSet<ChannelId>,
28 outgoing_invites: HashSet<(ChannelId, UserId)>,
29 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
30 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
31 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
32 client: Arc<Client>,
33 user_store: ModelHandle<UserStore>,
34 _rpc_subscription: Subscription,
35 _watch_connection_status: Task<Option<()>>,
36 disconnect_channel_buffers_task: Option<Task<()>>,
37 _update_channels: Task<()>,
38}
39
40pub type ChannelData = (Channel, ChannelPath);
41
42#[derive(Clone, Debug, PartialEq)]
43pub struct Channel {
44 pub id: ChannelId,
45 pub name: String,
46 pub unseen_note_version: Option<(u64, clock::Global)>,
47 pub unseen_message_id: Option<u64>,
48}
49
50#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
51pub struct ChannelPath(Arc<[ChannelId]>);
52
53pub struct ChannelMembership {
54 pub user: Arc<User>,
55 pub kind: proto::channel_member::Kind,
56 pub admin: bool,
57}
58
59pub enum ChannelEvent {
60 ChannelCreated(ChannelId),
61 ChannelRenamed(ChannelId),
62}
63
64impl Entity for ChannelStore {
65 type Event = ChannelEvent;
66}
67
68enum OpenedModelHandle<E: Entity> {
69 Open(WeakModelHandle<E>),
70 Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
71}
72
73impl ChannelStore {
74 pub fn new(
75 client: Arc<Client>,
76 user_store: ModelHandle<UserStore>,
77 cx: &mut ModelContext<Self>,
78 ) -> Self {
79 let rpc_subscription =
80 client.add_message_handler(cx.handle(), Self::handle_update_channels);
81
82 let mut connection_status = client.status();
83 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
84 let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
85 while let Some(status) = connection_status.next().await {
86 let this = this.upgrade(&cx)?;
87 if status.is_connected() {
88 this.update(&mut cx, |this, cx| this.handle_connect(cx))
89 .await
90 .log_err()?;
91 } else {
92 this.update(&mut cx, |this, cx| this.handle_disconnect(cx));
93 }
94 }
95 Some(())
96 });
97
98 Self {
99 channel_invitations: Vec::default(),
100 channel_index: ChannelIndex::default(),
101 channel_participants: Default::default(),
102 channels_with_admin_privileges: Default::default(),
103 outgoing_invites: Default::default(),
104 opened_buffers: Default::default(),
105 opened_chats: Default::default(),
106 update_channels_tx,
107 client,
108 user_store,
109 _rpc_subscription: rpc_subscription,
110 _watch_connection_status: watch_connection_status,
111 disconnect_channel_buffers_task: None,
112 _update_channels: cx.spawn_weak(|this, mut cx| async move {
113 while let Some(update_channels) = update_channels_rx.next().await {
114 if let Some(this) = this.upgrade(&cx) {
115 let update_task = this.update(&mut cx, |this, cx| {
116 this.update_channels(update_channels, cx)
117 });
118 if let Some(update_task) = update_task {
119 update_task.await.log_err();
120 }
121 }
122 }
123 }),
124 }
125 }
126
127 pub fn client(&self) -> Arc<Client> {
128 self.client.clone()
129 }
130
131 pub fn has_children(&self, channel_id: ChannelId) -> bool {
132 self.channel_index.iter().any(|path| {
133 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
134 path.len() > ix + 1
135 } else {
136 false
137 }
138 })
139 }
140
141 /// Returns the number of unique channels in the store
142 pub fn channel_count(&self) -> usize {
143 self.channel_index.by_id().len()
144 }
145
146 /// Returns the index of a channel ID in the list of unique channels
147 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
148 self.channel_index
149 .by_id()
150 .keys()
151 .position(|id| *id == channel_id)
152 }
153
154 /// Returns an iterator over all unique channels
155 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
156 self.channel_index.by_id().values()
157 }
158
159 /// Iterate over all entries in the channel DAG
160 pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
161 self.channel_index.iter().map(move |path| {
162 let id = path.last().unwrap();
163 let channel = self.channel_for_id(*id).unwrap();
164 (path.len() - 1, channel)
165 })
166 }
167
168 pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
169 let path = self.channel_index.get(ix)?;
170 let id = path.last().unwrap();
171 let channel = self.channel_for_id(*id).unwrap();
172
173 Some((channel, path))
174 }
175
176 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
177 self.channel_index.by_id().values().nth(ix)
178 }
179
180 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
181 &self.channel_invitations
182 }
183
184 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
185 self.channel_index.by_id().get(&channel_id)
186 }
187
188 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
189 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
190 if let OpenedModelHandle::Open(buffer) = buffer {
191 return buffer.upgrade(cx).is_some();
192 }
193 }
194 false
195 }
196
197 pub fn open_channel_buffer(
198 &mut self,
199 channel_id: ChannelId,
200 cx: &mut ModelContext<Self>,
201 ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
202 let client = self.client.clone();
203 let user_store = self.user_store.clone();
204 self.open_channel_resource(
205 channel_id,
206 |this| &mut this.opened_buffers,
207 |channel, cx| ChannelBuffer::new(channel, client, user_store, cx),
208 cx,
209 )
210 }
211
212 pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option<bool> {
213 self.channel_index
214 .by_id()
215 .get(&channel_id)
216 .map(|channel| channel.unseen_note_version.is_some())
217 }
218
219 pub fn has_new_messages(&self, channel_id: ChannelId) -> Option<bool> {
220 self.channel_index
221 .by_id()
222 .get(&channel_id)
223 .map(|channel| channel.unseen_message_id.is_some())
224 }
225
226 pub fn notes_changed(
227 &mut self,
228 channel_id: ChannelId,
229 epoch: u64,
230 version: &clock::Global,
231 cx: &mut ModelContext<Self>,
232 ) {
233 self.channel_index.note_changed(channel_id, epoch, version);
234 cx.notify();
235 }
236
237 pub fn new_message(
238 &mut self,
239 channel_id: ChannelId,
240 message_id: u64,
241 cx: &mut ModelContext<Self>,
242 ) {
243 self.channel_index.new_message(channel_id, message_id);
244 cx.notify();
245 }
246
247 pub fn acknowledge_message_id(
248 &mut self,
249 channel_id: ChannelId,
250 message_id: u64,
251 cx: &mut ModelContext<Self>,
252 ) {
253 self.channel_index
254 .acknowledge_message_id(channel_id, message_id);
255 cx.notify();
256 }
257
258 pub fn acknowledge_notes_version(
259 &mut self,
260 channel_id: ChannelId,
261 epoch: u64,
262 version: &clock::Global,
263 cx: &mut ModelContext<Self>,
264 ) {
265 self.channel_index
266 .acknowledge_note_version(channel_id, epoch, version);
267 cx.notify();
268 }
269
270 pub fn open_channel_chat(
271 &mut self,
272 channel_id: ChannelId,
273 cx: &mut ModelContext<Self>,
274 ) -> Task<Result<ModelHandle<ChannelChat>>> {
275 let client = self.client.clone();
276 let user_store = self.user_store.clone();
277 let this = cx.handle();
278 self.open_channel_resource(
279 channel_id,
280 |this| &mut this.opened_chats,
281 |channel, cx| ChannelChat::new(channel, this, user_store, client, cx),
282 cx,
283 )
284 }
285
286 /// Asynchronously open a given resource associated with a channel.
287 ///
288 /// Make sure that the resource is only opened once, even if this method
289 /// is called multiple times with the same channel id while the first task
290 /// is still running.
291 fn open_channel_resource<T: Entity, F, Fut>(
292 &mut self,
293 channel_id: ChannelId,
294 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
295 load: F,
296 cx: &mut ModelContext<Self>,
297 ) -> Task<Result<ModelHandle<T>>>
298 where
299 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
300 Fut: Future<Output = Result<ModelHandle<T>>>,
301 {
302 let task = loop {
303 match get_map(self).entry(channel_id) {
304 hash_map::Entry::Occupied(e) => match e.get() {
305 OpenedModelHandle::Open(model) => {
306 if let Some(model) = model.upgrade(cx) {
307 break Task::ready(Ok(model)).shared();
308 } else {
309 get_map(self).remove(&channel_id);
310 continue;
311 }
312 }
313 OpenedModelHandle::Loading(task) => {
314 break task.clone();
315 }
316 },
317 hash_map::Entry::Vacant(e) => {
318 let task = cx
319 .spawn(|this, cx| async move {
320 let channel = this.read_with(&cx, |this, _| {
321 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
322 Arc::new(anyhow!("no channel for id: {}", channel_id))
323 })
324 })?;
325
326 load(channel, cx).await.map_err(Arc::new)
327 })
328 .shared();
329
330 e.insert(OpenedModelHandle::Loading(task.clone()));
331 cx.spawn({
332 let task = task.clone();
333 |this, mut cx| async move {
334 let result = task.await;
335 this.update(&mut cx, |this, _| match result {
336 Ok(model) => {
337 get_map(this).insert(
338 channel_id,
339 OpenedModelHandle::Open(model.downgrade()),
340 );
341 }
342 Err(_) => {
343 get_map(this).remove(&channel_id);
344 }
345 });
346 }
347 })
348 .detach();
349 break task;
350 }
351 }
352 };
353 cx.foreground()
354 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
355 }
356
357 pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
358 self.channel_index.iter().any(|path| {
359 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
360 path[..=ix]
361 .iter()
362 .any(|id| self.channels_with_admin_privileges.contains(id))
363 } else {
364 false
365 }
366 })
367 }
368
369 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
370 self.channel_participants
371 .get(&channel_id)
372 .map_or(&[], |v| v.as_slice())
373 }
374
375 pub fn create_channel(
376 &self,
377 name: &str,
378 parent_id: Option<ChannelId>,
379 cx: &mut ModelContext<Self>,
380 ) -> Task<Result<ChannelId>> {
381 let client = self.client.clone();
382 let name = name.trim_start_matches("#").to_owned();
383 cx.spawn(|this, mut cx| async move {
384 let response = client
385 .request(proto::CreateChannel { name, parent_id })
386 .await?;
387
388 let channel = response
389 .channel
390 .ok_or_else(|| anyhow!("missing channel in response"))?;
391 let channel_id = channel.id;
392
393 let parent_edge = if let Some(parent_id) = parent_id {
394 vec![ChannelEdge {
395 channel_id: channel.id,
396 parent_id,
397 }]
398 } else {
399 vec![]
400 };
401
402 this.update(&mut cx, |this, cx| {
403 let task = this.update_channels(
404 proto::UpdateChannels {
405 channels: vec![channel],
406 insert_edge: parent_edge,
407 channel_permissions: vec![ChannelPermission {
408 channel_id,
409 is_admin: true,
410 }],
411 ..Default::default()
412 },
413 cx,
414 );
415 assert!(task.is_none());
416
417 // This event is emitted because the collab panel wants to clear the pending edit state
418 // before this frame is rendered. But we can't guarantee that the collab panel's future
419 // will resolve before this flush_effects finishes. Synchronously emitting this event
420 // ensures that the collab panel will observe this creation before the frame completes
421 cx.emit(ChannelEvent::ChannelCreated(channel_id));
422 });
423
424 Ok(channel_id)
425 })
426 }
427
428 pub fn link_channel(
429 &mut self,
430 channel_id: ChannelId,
431 to: ChannelId,
432 cx: &mut ModelContext<Self>,
433 ) -> Task<Result<()>> {
434 let client = self.client.clone();
435 cx.spawn(|_, _| async move {
436 let _ = client
437 .request(proto::LinkChannel { channel_id, to })
438 .await?;
439
440 Ok(())
441 })
442 }
443
444 pub fn unlink_channel(
445 &mut self,
446 channel_id: ChannelId,
447 from: ChannelId,
448 cx: &mut ModelContext<Self>,
449 ) -> Task<Result<()>> {
450 let client = self.client.clone();
451 cx.spawn(|_, _| async move {
452 let _ = client
453 .request(proto::UnlinkChannel { channel_id, from })
454 .await?;
455
456 Ok(())
457 })
458 }
459
460 pub fn move_channel(
461 &mut self,
462 channel_id: ChannelId,
463 from: ChannelId,
464 to: ChannelId,
465 cx: &mut ModelContext<Self>,
466 ) -> Task<Result<()>> {
467 let client = self.client.clone();
468 cx.spawn(|_, _| async move {
469 let _ = client
470 .request(proto::MoveChannel {
471 channel_id,
472 from,
473 to,
474 })
475 .await?;
476
477 Ok(())
478 })
479 }
480
481 pub fn invite_member(
482 &mut self,
483 channel_id: ChannelId,
484 user_id: UserId,
485 admin: bool,
486 cx: &mut ModelContext<Self>,
487 ) -> Task<Result<()>> {
488 if !self.outgoing_invites.insert((channel_id, user_id)) {
489 return Task::ready(Err(anyhow!("invite request already in progress")));
490 }
491
492 cx.notify();
493 let client = self.client.clone();
494 cx.spawn(|this, mut cx| async move {
495 let result = client
496 .request(proto::InviteChannelMember {
497 channel_id,
498 user_id,
499 admin,
500 })
501 .await;
502
503 this.update(&mut cx, |this, cx| {
504 this.outgoing_invites.remove(&(channel_id, user_id));
505 cx.notify();
506 });
507
508 result?;
509
510 Ok(())
511 })
512 }
513
514 pub fn remove_member(
515 &mut self,
516 channel_id: ChannelId,
517 user_id: u64,
518 cx: &mut ModelContext<Self>,
519 ) -> Task<Result<()>> {
520 if !self.outgoing_invites.insert((channel_id, user_id)) {
521 return Task::ready(Err(anyhow!("invite request already in progress")));
522 }
523
524 cx.notify();
525 let client = self.client.clone();
526 cx.spawn(|this, mut cx| async move {
527 let result = client
528 .request(proto::RemoveChannelMember {
529 channel_id,
530 user_id,
531 })
532 .await;
533
534 this.update(&mut cx, |this, cx| {
535 this.outgoing_invites.remove(&(channel_id, user_id));
536 cx.notify();
537 });
538 result?;
539 Ok(())
540 })
541 }
542
543 pub fn set_member_admin(
544 &mut self,
545 channel_id: ChannelId,
546 user_id: UserId,
547 admin: bool,
548 cx: &mut ModelContext<Self>,
549 ) -> Task<Result<()>> {
550 if !self.outgoing_invites.insert((channel_id, user_id)) {
551 return Task::ready(Err(anyhow!("member request already in progress")));
552 }
553
554 cx.notify();
555 let client = self.client.clone();
556 cx.spawn(|this, mut cx| async move {
557 let result = client
558 .request(proto::SetChannelMemberAdmin {
559 channel_id,
560 user_id,
561 admin,
562 })
563 .await;
564
565 this.update(&mut cx, |this, cx| {
566 this.outgoing_invites.remove(&(channel_id, user_id));
567 cx.notify();
568 });
569
570 result?;
571 Ok(())
572 })
573 }
574
575 pub fn rename(
576 &mut self,
577 channel_id: ChannelId,
578 new_name: &str,
579 cx: &mut ModelContext<Self>,
580 ) -> Task<Result<()>> {
581 let client = self.client.clone();
582 let name = new_name.to_string();
583 cx.spawn(|this, mut cx| async move {
584 let channel = client
585 .request(proto::RenameChannel { channel_id, name })
586 .await?
587 .channel
588 .ok_or_else(|| anyhow!("missing channel in response"))?;
589 this.update(&mut cx, |this, cx| {
590 let task = this.update_channels(
591 proto::UpdateChannels {
592 channels: vec![channel],
593 ..Default::default()
594 },
595 cx,
596 );
597 assert!(task.is_none());
598
599 // This event is emitted because the collab panel wants to clear the pending edit state
600 // before this frame is rendered. But we can't guarantee that the collab panel's future
601 // will resolve before this flush_effects finishes. Synchronously emitting this event
602 // ensures that the collab panel will observe this creation before the frame complete
603 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
604 });
605 Ok(())
606 })
607 }
608
609 pub fn respond_to_channel_invite(
610 &mut self,
611 channel_id: ChannelId,
612 accept: bool,
613 ) -> impl Future<Output = Result<()>> {
614 let client = self.client.clone();
615 async move {
616 client
617 .request(proto::RespondToChannelInvite { channel_id, accept })
618 .await?;
619 Ok(())
620 }
621 }
622
623 pub fn get_channel_member_details(
624 &self,
625 channel_id: ChannelId,
626 cx: &mut ModelContext<Self>,
627 ) -> Task<Result<Vec<ChannelMembership>>> {
628 let client = self.client.clone();
629 let user_store = self.user_store.downgrade();
630 cx.spawn(|_, mut cx| async move {
631 let response = client
632 .request(proto::GetChannelMembers { channel_id })
633 .await?;
634
635 let user_ids = response.members.iter().map(|m| m.user_id).collect();
636 let user_store = user_store
637 .upgrade(&cx)
638 .ok_or_else(|| anyhow!("user store dropped"))?;
639 let users = user_store
640 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
641 .await?;
642
643 Ok(users
644 .into_iter()
645 .zip(response.members)
646 .filter_map(|(user, member)| {
647 Some(ChannelMembership {
648 user,
649 admin: member.admin,
650 kind: proto::channel_member::Kind::from_i32(member.kind)?,
651 })
652 })
653 .collect())
654 })
655 }
656
657 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
658 let client = self.client.clone();
659 async move {
660 client.request(proto::DeleteChannel { channel_id }).await?;
661 Ok(())
662 }
663 }
664
665 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
666 false
667 }
668
669 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
670 self.outgoing_invites.contains(&(channel_id, user_id))
671 }
672
673 async fn handle_update_channels(
674 this: ModelHandle<Self>,
675 message: TypedEnvelope<proto::UpdateChannels>,
676 _: Arc<Client>,
677 mut cx: AsyncAppContext,
678 ) -> Result<()> {
679 this.update(&mut cx, |this, _| {
680 this.update_channels_tx
681 .unbounded_send(message.payload)
682 .unwrap();
683 });
684 Ok(())
685 }
686
687 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
688 self.disconnect_channel_buffers_task.take();
689
690 for chat in self.opened_chats.values() {
691 if let OpenedModelHandle::Open(chat) = chat {
692 if let Some(chat) = chat.upgrade(cx) {
693 chat.update(cx, |chat, cx| {
694 chat.rejoin(cx);
695 });
696 }
697 }
698 }
699
700 let mut buffer_versions = Vec::new();
701 for buffer in self.opened_buffers.values() {
702 if let OpenedModelHandle::Open(buffer) = buffer {
703 if let Some(buffer) = buffer.upgrade(cx) {
704 let channel_buffer = buffer.read(cx);
705 let buffer = channel_buffer.buffer().read(cx);
706 buffer_versions.push(proto::ChannelBufferVersion {
707 channel_id: channel_buffer.channel().id,
708 epoch: channel_buffer.epoch(),
709 version: language::proto::serialize_version(&buffer.version()),
710 });
711 }
712 }
713 }
714
715 if buffer_versions.is_empty() {
716 return Task::ready(Ok(()));
717 }
718
719 let response = self.client.request(proto::RejoinChannelBuffers {
720 buffers: buffer_versions,
721 });
722
723 cx.spawn(|this, mut cx| async move {
724 let mut response = response.await?;
725
726 this.update(&mut cx, |this, cx| {
727 this.opened_buffers.retain(|_, buffer| match buffer {
728 OpenedModelHandle::Open(channel_buffer) => {
729 let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
730 return false;
731 };
732
733 channel_buffer.update(cx, |channel_buffer, cx| {
734 let channel_id = channel_buffer.channel().id;
735 if let Some(remote_buffer) = response
736 .buffers
737 .iter_mut()
738 .find(|buffer| buffer.channel_id == channel_id)
739 {
740 let channel_id = channel_buffer.channel().id;
741 let remote_version =
742 language::proto::deserialize_version(&remote_buffer.version);
743
744 channel_buffer.replace_collaborators(
745 mem::take(&mut remote_buffer.collaborators),
746 cx,
747 );
748
749 let operations = channel_buffer
750 .buffer()
751 .update(cx, |buffer, cx| {
752 let outgoing_operations =
753 buffer.serialize_ops(Some(remote_version), cx);
754 let incoming_operations =
755 mem::take(&mut remote_buffer.operations)
756 .into_iter()
757 .map(language::proto::deserialize_operation)
758 .collect::<Result<Vec<_>>>()?;
759 buffer.apply_ops(incoming_operations, cx)?;
760 anyhow::Ok(outgoing_operations)
761 })
762 .log_err();
763
764 if let Some(operations) = operations {
765 let client = this.client.clone();
766 cx.background()
767 .spawn(async move {
768 let operations = operations.await;
769 for chunk in
770 language::proto::split_operations(operations)
771 {
772 client
773 .send(proto::UpdateChannelBuffer {
774 channel_id,
775 operations: chunk,
776 })
777 .ok();
778 }
779 })
780 .detach();
781 return true;
782 }
783 }
784
785 channel_buffer.disconnect(cx);
786 false
787 })
788 }
789 OpenedModelHandle::Loading(_) => true,
790 });
791 });
792 anyhow::Ok(())
793 })
794 }
795
796 fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
797 self.channel_index.clear();
798 self.channel_invitations.clear();
799 self.channel_participants.clear();
800 self.channels_with_admin_privileges.clear();
801 self.channel_index.clear();
802 self.outgoing_invites.clear();
803 cx.notify();
804
805 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
806 cx.spawn_weak(|this, mut cx| async move {
807 cx.background().timer(RECONNECT_TIMEOUT).await;
808 if let Some(this) = this.upgrade(&cx) {
809 this.update(&mut cx, |this, cx| {
810 for (_, buffer) in this.opened_buffers.drain() {
811 if let OpenedModelHandle::Open(buffer) = buffer {
812 if let Some(buffer) = buffer.upgrade(cx) {
813 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
814 }
815 }
816 }
817 });
818 }
819 })
820 });
821 }
822
823 pub(crate) fn update_channels(
824 &mut self,
825 payload: proto::UpdateChannels,
826 cx: &mut ModelContext<ChannelStore>,
827 ) -> Option<Task<Result<()>>> {
828 if !payload.remove_channel_invitations.is_empty() {
829 self.channel_invitations
830 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
831 }
832 for channel in payload.channel_invitations {
833 match self
834 .channel_invitations
835 .binary_search_by_key(&channel.id, |c| c.id)
836 {
837 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
838 Err(ix) => self.channel_invitations.insert(
839 ix,
840 Arc::new(Channel {
841 id: channel.id,
842 name: channel.name,
843 unseen_note_version: None,
844 unseen_message_id: None,
845 }),
846 ),
847 }
848 }
849
850 let channels_changed = !payload.channels.is_empty()
851 || !payload.delete_channels.is_empty()
852 || !payload.insert_edge.is_empty()
853 || !payload.delete_edge.is_empty()
854 || !payload.unseen_channel_messages.is_empty()
855 || !payload.unseen_channel_buffer_changes.is_empty();
856
857 if channels_changed {
858 if !payload.delete_channels.is_empty() {
859 self.channel_index.delete_channels(&payload.delete_channels);
860 self.channel_participants
861 .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
862 self.channels_with_admin_privileges
863 .retain(|channel_id| !payload.delete_channels.contains(channel_id));
864
865 for channel_id in &payload.delete_channels {
866 let channel_id = *channel_id;
867 if let Some(OpenedModelHandle::Open(buffer)) =
868 self.opened_buffers.remove(&channel_id)
869 {
870 if let Some(buffer) = buffer.upgrade(cx) {
871 buffer.update(cx, ChannelBuffer::disconnect);
872 }
873 }
874 }
875 }
876
877 let mut index = self.channel_index.bulk_insert();
878 for channel in payload.channels {
879 index.insert(channel)
880 }
881
882 for unseen_buffer_change in payload.unseen_channel_buffer_changes {
883 let version = language::proto::deserialize_version(&unseen_buffer_change.version);
884 index.note_changed(
885 unseen_buffer_change.channel_id,
886 unseen_buffer_change.epoch,
887 &version,
888 );
889 }
890
891 for unseen_channel_message in payload.unseen_channel_messages {
892 index.new_messages(
893 unseen_channel_message.channel_id,
894 unseen_channel_message.message_id,
895 );
896 }
897
898 for edge in payload.insert_edge {
899 index.insert_edge(edge.channel_id, edge.parent_id);
900 }
901
902 for edge in payload.delete_edge {
903 index.delete_edge(edge.parent_id, edge.channel_id);
904 }
905 }
906
907 for permission in payload.channel_permissions {
908 if permission.is_admin {
909 self.channels_with_admin_privileges
910 .insert(permission.channel_id);
911 } else {
912 self.channels_with_admin_privileges
913 .remove(&permission.channel_id);
914 }
915 }
916
917 cx.notify();
918 if payload.channel_participants.is_empty() {
919 return None;
920 }
921
922 let mut all_user_ids = Vec::new();
923 let channel_participants = payload.channel_participants;
924 for entry in &channel_participants {
925 for user_id in entry.participant_user_ids.iter() {
926 if let Err(ix) = all_user_ids.binary_search(user_id) {
927 all_user_ids.insert(ix, *user_id);
928 }
929 }
930 }
931
932 let users = self
933 .user_store
934 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
935 Some(cx.spawn(|this, mut cx| async move {
936 let users = users.await?;
937
938 this.update(&mut cx, |this, cx| {
939 for entry in &channel_participants {
940 let mut participants: Vec<_> = entry
941 .participant_user_ids
942 .iter()
943 .filter_map(|user_id| {
944 users
945 .binary_search_by_key(&user_id, |user| &user.id)
946 .ok()
947 .map(|ix| users[ix].clone())
948 })
949 .collect();
950
951 participants.sort_by_key(|u| u.id);
952
953 this.channel_participants
954 .insert(entry.channel_id, participants);
955 }
956
957 cx.notify();
958 });
959 anyhow::Ok(())
960 }))
961 }
962}
963
964impl Deref for ChannelPath {
965 type Target = [ChannelId];
966
967 fn deref(&self) -> &Self::Target {
968 &self.0
969 }
970}
971
972impl ChannelPath {
973 pub fn new(path: Arc<[ChannelId]>) -> Self {
974 debug_assert!(path.len() >= 1);
975 Self(path)
976 }
977
978 pub fn parent_id(&self) -> Option<ChannelId> {
979 self.0.len().checked_sub(2).map(|i| self.0[i])
980 }
981
982 pub fn channel_id(&self) -> ChannelId {
983 self.0[self.0.len() - 1]
984 }
985}
986
987impl From<ChannelPath> for Cow<'static, ChannelPath> {
988 fn from(value: ChannelPath) -> Self {
989 Cow::Owned(value)
990 }
991}
992
993impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
994 fn from(value: &'a ChannelPath) -> Self {
995 Cow::Borrowed(value)
996 }
997}
998
999impl Default for ChannelPath {
1000 fn default() -> Self {
1001 ChannelPath(Arc::from([]))
1002 }
1003}