1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
4use anyhow::{anyhow, Result};
5use client::{Client, Subscription, User, UserId, UserStore};
6use collections::{hash_map, HashMap, HashSet};
7use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
8use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
9use rpc::{
10 proto::{self, ChannelEdge, ChannelPermission},
11 TypedEnvelope,
12};
13use serde_derive::{Deserialize, Serialize};
14use std::{borrow::Cow, hash::Hash, mem, ops::Deref, sync::Arc, time::Duration};
15use util::ResultExt;
16
17use self::channel_index::ChannelIndex;
18
19pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
20
21pub type ChannelId = u64;
22
23pub struct ChannelStore {
24 channel_index: ChannelIndex,
25 channel_invitations: Vec<Arc<Channel>>,
26 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
27 channels_with_admin_privileges: HashSet<ChannelId>,
28 outgoing_invites: HashSet<(ChannelId, UserId)>,
29 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
30 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
31 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
32 client: Arc<Client>,
33 user_store: ModelHandle<UserStore>,
34 _rpc_subscription: Subscription,
35 _watch_connection_status: Task<Option<()>>,
36 disconnect_channel_buffers_task: Option<Task<()>>,
37 _update_channels: Task<()>,
38}
39
40pub type ChannelData = (Channel, ChannelPath);
41
42#[derive(Clone, Debug, PartialEq)]
43pub struct Channel {
44 pub id: ChannelId,
45 pub name: String,
46}
47
48#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
49pub struct ChannelPath(Arc<[ChannelId]>);
50
51pub struct ChannelMembership {
52 pub user: Arc<User>,
53 pub kind: proto::channel_member::Kind,
54 pub admin: bool,
55}
56
57pub enum ChannelEvent {
58 ChannelCreated(ChannelId),
59 ChannelRenamed(ChannelId),
60}
61
62impl Entity for ChannelStore {
63 type Event = ChannelEvent;
64}
65
66enum OpenedModelHandle<E: Entity> {
67 Open(WeakModelHandle<E>),
68 Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
69}
70
71impl ChannelStore {
72 pub fn new(
73 client: Arc<Client>,
74 user_store: ModelHandle<UserStore>,
75 cx: &mut ModelContext<Self>,
76 ) -> Self {
77 let rpc_subscription =
78 client.add_message_handler(cx.handle(), Self::handle_update_channels);
79
80 let mut connection_status = client.status();
81 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
82 let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
83 while let Some(status) = connection_status.next().await {
84 let this = this.upgrade(&cx)?;
85 if status.is_connected() {
86 this.update(&mut cx, |this, cx| this.handle_connect(cx))
87 .await
88 .log_err()?;
89 } else {
90 this.update(&mut cx, |this, cx| this.handle_disconnect(cx));
91 }
92 }
93 Some(())
94 });
95
96 Self {
97 channel_invitations: Vec::default(),
98 channel_index: ChannelIndex::default(),
99 channel_participants: Default::default(),
100 channels_with_admin_privileges: Default::default(),
101 outgoing_invites: Default::default(),
102 opened_buffers: Default::default(),
103 opened_chats: Default::default(),
104 update_channels_tx,
105 client,
106 user_store,
107 _rpc_subscription: rpc_subscription,
108 _watch_connection_status: watch_connection_status,
109 disconnect_channel_buffers_task: None,
110 _update_channels: cx.spawn_weak(|this, mut cx| async move {
111 while let Some(update_channels) = update_channels_rx.next().await {
112 if let Some(this) = this.upgrade(&cx) {
113 let update_task = this.update(&mut cx, |this, cx| {
114 this.update_channels(update_channels, cx)
115 });
116 if let Some(update_task) = update_task {
117 update_task.await.log_err();
118 }
119 }
120 }
121 }),
122 }
123 }
124
125 pub fn client(&self) -> Arc<Client> {
126 self.client.clone()
127 }
128
129 pub fn has_children(&self, channel_id: ChannelId) -> bool {
130 self.channel_index.iter().any(|path| {
131 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
132 path.len() > ix + 1
133 } else {
134 false
135 }
136 })
137 }
138
139 /// Returns the number of unique channels in the store
140 pub fn channel_count(&self) -> usize {
141 self.channel_index.by_id().len()
142 }
143
144 /// Returns the index of a channel ID in the list of unique channels
145 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
146 self.channel_index
147 .by_id()
148 .keys()
149 .position(|id| *id == channel_id)
150 }
151
152 /// Returns an iterator over all unique channels
153 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
154 self.channel_index.by_id().values()
155 }
156
157 /// Iterate over all entries in the channel DAG
158 pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
159 self.channel_index.iter().map(move |path| {
160 let id = path.last().unwrap();
161 let channel = self.channel_for_id(*id).unwrap();
162 (path.len() - 1, channel)
163 })
164 }
165
166 pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
167 let path = self.channel_index.get(ix)?;
168 let id = path.last().unwrap();
169 let channel = self.channel_for_id(*id).unwrap();
170
171 Some((channel, path))
172 }
173
174 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
175 self.channel_index.by_id().values().nth(ix)
176 }
177
178 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
179 &self.channel_invitations
180 }
181
182 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
183 self.channel_index.by_id().get(&channel_id)
184 }
185
186 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
187 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
188 if let OpenedModelHandle::Open(buffer) = buffer {
189 return buffer.upgrade(cx).is_some();
190 }
191 }
192 false
193 }
194
195 pub fn open_channel_buffer(
196 &mut self,
197 channel_id: ChannelId,
198 cx: &mut ModelContext<Self>,
199 ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
200 let client = self.client.clone();
201 self.open_channel_resource(
202 channel_id,
203 |this| &mut this.opened_buffers,
204 |channel, cx| ChannelBuffer::new(channel, client, cx),
205 cx,
206 )
207 }
208
209 pub fn open_channel_chat(
210 &mut self,
211 channel_id: ChannelId,
212 cx: &mut ModelContext<Self>,
213 ) -> Task<Result<ModelHandle<ChannelChat>>> {
214 let client = self.client.clone();
215 let user_store = self.user_store.clone();
216 self.open_channel_resource(
217 channel_id,
218 |this| &mut this.opened_chats,
219 |channel, cx| ChannelChat::new(channel, user_store, client, cx),
220 cx,
221 )
222 }
223
224 /// Asynchronously open a given resource associated with a channel.
225 ///
226 /// Make sure that the resource is only opened once, even if this method
227 /// is called multiple times with the same channel id while the first task
228 /// is still running.
229 fn open_channel_resource<T: Entity, F, Fut>(
230 &mut self,
231 channel_id: ChannelId,
232 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
233 load: F,
234 cx: &mut ModelContext<Self>,
235 ) -> Task<Result<ModelHandle<T>>>
236 where
237 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
238 Fut: Future<Output = Result<ModelHandle<T>>>,
239 {
240 let task = loop {
241 match get_map(self).entry(channel_id) {
242 hash_map::Entry::Occupied(e) => match e.get() {
243 OpenedModelHandle::Open(model) => {
244 if let Some(model) = model.upgrade(cx) {
245 break Task::ready(Ok(model)).shared();
246 } else {
247 get_map(self).remove(&channel_id);
248 continue;
249 }
250 }
251 OpenedModelHandle::Loading(task) => {
252 break task.clone();
253 }
254 },
255 hash_map::Entry::Vacant(e) => {
256 let task = cx
257 .spawn(|this, cx| async move {
258 let channel = this.read_with(&cx, |this, _| {
259 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
260 Arc::new(anyhow!("no channel for id: {}", channel_id))
261 })
262 })?;
263
264 load(channel, cx).await.map_err(Arc::new)
265 })
266 .shared();
267
268 e.insert(OpenedModelHandle::Loading(task.clone()));
269 cx.spawn({
270 let task = task.clone();
271 |this, mut cx| async move {
272 let result = task.await;
273 this.update(&mut cx, |this, _| match result {
274 Ok(model) => {
275 get_map(this).insert(
276 channel_id,
277 OpenedModelHandle::Open(model.downgrade()),
278 );
279 }
280 Err(_) => {
281 get_map(this).remove(&channel_id);
282 }
283 });
284 }
285 })
286 .detach();
287 break task;
288 }
289 }
290 };
291 cx.foreground()
292 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
293 }
294
295 pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
296 self.channel_index.iter().any(|path| {
297 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
298 path[..=ix]
299 .iter()
300 .any(|id| self.channels_with_admin_privileges.contains(id))
301 } else {
302 false
303 }
304 })
305 }
306
307 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
308 self.channel_participants
309 .get(&channel_id)
310 .map_or(&[], |v| v.as_slice())
311 }
312
313 pub fn create_channel(
314 &self,
315 name: &str,
316 parent_id: Option<ChannelId>,
317 cx: &mut ModelContext<Self>,
318 ) -> Task<Result<ChannelId>> {
319 let client = self.client.clone();
320 let name = name.trim_start_matches("#").to_owned();
321 cx.spawn(|this, mut cx| async move {
322 let response = client
323 .request(proto::CreateChannel { name, parent_id })
324 .await?;
325
326 let channel = response
327 .channel
328 .ok_or_else(|| anyhow!("missing channel in response"))?;
329 let channel_id = channel.id;
330
331 let parent_edge = if let Some(parent_id) = parent_id {
332 vec![ChannelEdge {
333 channel_id: channel.id,
334 parent_id,
335 }]
336 } else {
337 vec![]
338 };
339
340 this.update(&mut cx, |this, cx| {
341 let task = this.update_channels(
342 proto::UpdateChannels {
343 channels: vec![channel],
344 insert_edge: parent_edge,
345 channel_permissions: vec![ChannelPermission {
346 channel_id,
347 is_admin: true,
348 }],
349 ..Default::default()
350 },
351 cx,
352 );
353 assert!(task.is_none());
354
355 // This event is emitted because the collab panel wants to clear the pending edit state
356 // before this frame is rendered. But we can't guarantee that the collab panel's future
357 // will resolve before this flush_effects finishes. Synchronously emitting this event
358 // ensures that the collab panel will observe this creation before the frame completes
359 cx.emit(ChannelEvent::ChannelCreated(channel_id));
360 });
361
362 Ok(channel_id)
363 })
364 }
365
366 pub fn link_channel(
367 &mut self,
368 channel_id: ChannelId,
369 to: ChannelId,
370 cx: &mut ModelContext<Self>,
371 ) -> Task<Result<()>> {
372 let client = self.client.clone();
373 cx.spawn(|_, _| async move {
374 let _ = client
375 .request(proto::LinkChannel { channel_id, to })
376 .await?;
377
378 Ok(())
379 })
380 }
381
382 pub fn unlink_channel(
383 &mut self,
384 channel_id: ChannelId,
385 from: ChannelId,
386 cx: &mut ModelContext<Self>,
387 ) -> Task<Result<()>> {
388 let client = self.client.clone();
389 cx.spawn(|_, _| async move {
390 let _ = client
391 .request(proto::UnlinkChannel { channel_id, from })
392 .await?;
393
394 Ok(())
395 })
396 }
397
398 pub fn move_channel(
399 &mut self,
400 channel_id: ChannelId,
401 from: ChannelId,
402 to: ChannelId,
403 cx: &mut ModelContext<Self>,
404 ) -> Task<Result<()>> {
405 let client = self.client.clone();
406 cx.spawn(|_, _| async move {
407 let _ = client
408 .request(proto::MoveChannel {
409 channel_id,
410 from,
411 to,
412 })
413 .await?;
414
415 Ok(())
416 })
417 }
418
419 pub fn invite_member(
420 &mut self,
421 channel_id: ChannelId,
422 user_id: UserId,
423 admin: bool,
424 cx: &mut ModelContext<Self>,
425 ) -> Task<Result<()>> {
426 if !self.outgoing_invites.insert((channel_id, user_id)) {
427 return Task::ready(Err(anyhow!("invite request already in progress")));
428 }
429
430 cx.notify();
431 let client = self.client.clone();
432 cx.spawn(|this, mut cx| async move {
433 let result = client
434 .request(proto::InviteChannelMember {
435 channel_id,
436 user_id,
437 admin,
438 })
439 .await;
440
441 this.update(&mut cx, |this, cx| {
442 this.outgoing_invites.remove(&(channel_id, user_id));
443 cx.notify();
444 });
445
446 result?;
447
448 Ok(())
449 })
450 }
451
452 pub fn remove_member(
453 &mut self,
454 channel_id: ChannelId,
455 user_id: u64,
456 cx: &mut ModelContext<Self>,
457 ) -> Task<Result<()>> {
458 if !self.outgoing_invites.insert((channel_id, user_id)) {
459 return Task::ready(Err(anyhow!("invite request already in progress")));
460 }
461
462 cx.notify();
463 let client = self.client.clone();
464 cx.spawn(|this, mut cx| async move {
465 let result = client
466 .request(proto::RemoveChannelMember {
467 channel_id,
468 user_id,
469 })
470 .await;
471
472 this.update(&mut cx, |this, cx| {
473 this.outgoing_invites.remove(&(channel_id, user_id));
474 cx.notify();
475 });
476 result?;
477 Ok(())
478 })
479 }
480
481 pub fn set_member_admin(
482 &mut self,
483 channel_id: ChannelId,
484 user_id: UserId,
485 admin: bool,
486 cx: &mut ModelContext<Self>,
487 ) -> Task<Result<()>> {
488 if !self.outgoing_invites.insert((channel_id, user_id)) {
489 return Task::ready(Err(anyhow!("member request already in progress")));
490 }
491
492 cx.notify();
493 let client = self.client.clone();
494 cx.spawn(|this, mut cx| async move {
495 let result = client
496 .request(proto::SetChannelMemberAdmin {
497 channel_id,
498 user_id,
499 admin,
500 })
501 .await;
502
503 this.update(&mut cx, |this, cx| {
504 this.outgoing_invites.remove(&(channel_id, user_id));
505 cx.notify();
506 });
507
508 result?;
509 Ok(())
510 })
511 }
512
513 pub fn rename(
514 &mut self,
515 channel_id: ChannelId,
516 new_name: &str,
517 cx: &mut ModelContext<Self>,
518 ) -> Task<Result<()>> {
519 let client = self.client.clone();
520 let name = new_name.to_string();
521 cx.spawn(|this, mut cx| async move {
522 let channel = client
523 .request(proto::RenameChannel { channel_id, name })
524 .await?
525 .channel
526 .ok_or_else(|| anyhow!("missing channel in response"))?;
527 this.update(&mut cx, |this, cx| {
528 let task = this.update_channels(
529 proto::UpdateChannels {
530 channels: vec![channel],
531 ..Default::default()
532 },
533 cx,
534 );
535 assert!(task.is_none());
536
537 // This event is emitted because the collab panel wants to clear the pending edit state
538 // before this frame is rendered. But we can't guarantee that the collab panel's future
539 // will resolve before this flush_effects finishes. Synchronously emitting this event
540 // ensures that the collab panel will observe this creation before the frame complete
541 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
542 });
543 Ok(())
544 })
545 }
546
547 pub fn respond_to_channel_invite(
548 &mut self,
549 channel_id: ChannelId,
550 accept: bool,
551 ) -> impl Future<Output = Result<()>> {
552 let client = self.client.clone();
553 async move {
554 client
555 .request(proto::RespondToChannelInvite { channel_id, accept })
556 .await?;
557 Ok(())
558 }
559 }
560
561 pub fn get_channel_member_details(
562 &self,
563 channel_id: ChannelId,
564 cx: &mut ModelContext<Self>,
565 ) -> Task<Result<Vec<ChannelMembership>>> {
566 let client = self.client.clone();
567 let user_store = self.user_store.downgrade();
568 cx.spawn(|_, mut cx| async move {
569 let response = client
570 .request(proto::GetChannelMembers { channel_id })
571 .await?;
572
573 let user_ids = response.members.iter().map(|m| m.user_id).collect();
574 let user_store = user_store
575 .upgrade(&cx)
576 .ok_or_else(|| anyhow!("user store dropped"))?;
577 let users = user_store
578 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
579 .await?;
580
581 Ok(users
582 .into_iter()
583 .zip(response.members)
584 .filter_map(|(user, member)| {
585 Some(ChannelMembership {
586 user,
587 admin: member.admin,
588 kind: proto::channel_member::Kind::from_i32(member.kind)?,
589 })
590 })
591 .collect())
592 })
593 }
594
595 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
596 let client = self.client.clone();
597 async move {
598 client.request(proto::DeleteChannel { channel_id }).await?;
599 Ok(())
600 }
601 }
602
603 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
604 false
605 }
606
607 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
608 self.outgoing_invites.contains(&(channel_id, user_id))
609 }
610
611 async fn handle_update_channels(
612 this: ModelHandle<Self>,
613 message: TypedEnvelope<proto::UpdateChannels>,
614 _: Arc<Client>,
615 mut cx: AsyncAppContext,
616 ) -> Result<()> {
617 this.update(&mut cx, |this, _| {
618 this.update_channels_tx
619 .unbounded_send(message.payload)
620 .unwrap();
621 });
622 Ok(())
623 }
624
625 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
626 self.disconnect_channel_buffers_task.take();
627
628 for chat in self.opened_chats.values() {
629 if let OpenedModelHandle::Open(chat) = chat {
630 if let Some(chat) = chat.upgrade(cx) {
631 chat.update(cx, |chat, cx| {
632 chat.rejoin(cx);
633 });
634 }
635 }
636 }
637
638 let mut buffer_versions = Vec::new();
639 for buffer in self.opened_buffers.values() {
640 if let OpenedModelHandle::Open(buffer) = buffer {
641 if let Some(buffer) = buffer.upgrade(cx) {
642 let channel_buffer = buffer.read(cx);
643 let buffer = channel_buffer.buffer().read(cx);
644 buffer_versions.push(proto::ChannelBufferVersion {
645 channel_id: channel_buffer.channel().id,
646 epoch: channel_buffer.epoch(),
647 version: language::proto::serialize_version(&buffer.version()),
648 });
649 }
650 }
651 }
652
653 if buffer_versions.is_empty() {
654 return Task::ready(Ok(()));
655 }
656
657 let response = self.client.request(proto::RejoinChannelBuffers {
658 buffers: buffer_versions,
659 });
660
661 cx.spawn(|this, mut cx| async move {
662 let mut response = response.await?;
663
664 this.update(&mut cx, |this, cx| {
665 this.opened_buffers.retain(|_, buffer| match buffer {
666 OpenedModelHandle::Open(channel_buffer) => {
667 let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
668 return false;
669 };
670
671 channel_buffer.update(cx, |channel_buffer, cx| {
672 let channel_id = channel_buffer.channel().id;
673 if let Some(remote_buffer) = response
674 .buffers
675 .iter_mut()
676 .find(|buffer| buffer.channel_id == channel_id)
677 {
678 let channel_id = channel_buffer.channel().id;
679 let remote_version =
680 language::proto::deserialize_version(&remote_buffer.version);
681
682 channel_buffer.replace_collaborators(
683 mem::take(&mut remote_buffer.collaborators),
684 cx,
685 );
686
687 let operations = channel_buffer
688 .buffer()
689 .update(cx, |buffer, cx| {
690 let outgoing_operations =
691 buffer.serialize_ops(Some(remote_version), cx);
692 let incoming_operations =
693 mem::take(&mut remote_buffer.operations)
694 .into_iter()
695 .map(language::proto::deserialize_operation)
696 .collect::<Result<Vec<_>>>()?;
697 buffer.apply_ops(incoming_operations, cx)?;
698 anyhow::Ok(outgoing_operations)
699 })
700 .log_err();
701
702 if let Some(operations) = operations {
703 let client = this.client.clone();
704 cx.background()
705 .spawn(async move {
706 let operations = operations.await;
707 for chunk in
708 language::proto::split_operations(operations)
709 {
710 client
711 .send(proto::UpdateChannelBuffer {
712 channel_id,
713 operations: chunk,
714 })
715 .ok();
716 }
717 })
718 .detach();
719 return true;
720 }
721 }
722
723 channel_buffer.disconnect(cx);
724 false
725 })
726 }
727 OpenedModelHandle::Loading(_) => true,
728 });
729 });
730 anyhow::Ok(())
731 })
732 }
733
734 fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
735 self.channel_index.clear();
736 self.channel_invitations.clear();
737 self.channel_participants.clear();
738 self.channels_with_admin_privileges.clear();
739 self.channel_index.clear();
740 self.outgoing_invites.clear();
741 cx.notify();
742
743 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
744 cx.spawn_weak(|this, mut cx| async move {
745 cx.background().timer(RECONNECT_TIMEOUT).await;
746 if let Some(this) = this.upgrade(&cx) {
747 this.update(&mut cx, |this, cx| {
748 for (_, buffer) in this.opened_buffers.drain() {
749 if let OpenedModelHandle::Open(buffer) = buffer {
750 if let Some(buffer) = buffer.upgrade(cx) {
751 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
752 }
753 }
754 }
755 });
756 }
757 })
758 });
759 }
760
761 pub(crate) fn update_channels(
762 &mut self,
763 payload: proto::UpdateChannels,
764 cx: &mut ModelContext<ChannelStore>,
765 ) -> Option<Task<Result<()>>> {
766 if !payload.remove_channel_invitations.is_empty() {
767 self.channel_invitations
768 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
769 }
770 for channel in payload.channel_invitations {
771 match self
772 .channel_invitations
773 .binary_search_by_key(&channel.id, |c| c.id)
774 {
775 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
776 Err(ix) => self.channel_invitations.insert(
777 ix,
778 Arc::new(Channel {
779 id: channel.id,
780 name: channel.name,
781 }),
782 ),
783 }
784 }
785
786 let channels_changed = !payload.channels.is_empty()
787 || !payload.delete_channels.is_empty()
788 || !payload.insert_edge.is_empty()
789 || !payload.delete_edge.is_empty();
790
791 if channels_changed {
792 if !payload.delete_channels.is_empty() {
793 self.channel_index.delete_channels(&payload.delete_channels);
794 self.channel_participants
795 .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
796 self.channels_with_admin_privileges
797 .retain(|channel_id| !payload.delete_channels.contains(channel_id));
798
799 for channel_id in &payload.delete_channels {
800 let channel_id = *channel_id;
801 if let Some(OpenedModelHandle::Open(buffer)) =
802 self.opened_buffers.remove(&channel_id)
803 {
804 if let Some(buffer) = buffer.upgrade(cx) {
805 buffer.update(cx, ChannelBuffer::disconnect);
806 }
807 }
808 }
809 }
810
811 let mut index = self.channel_index.bulk_insert();
812 for channel in payload.channels {
813 index.insert(channel)
814 }
815
816 for edge in payload.insert_edge {
817 index.insert_edge(edge.channel_id, edge.parent_id);
818 }
819
820 for edge in payload.delete_edge {
821 index.delete_edge(edge.parent_id, edge.channel_id);
822 }
823 }
824
825 for permission in payload.channel_permissions {
826 if permission.is_admin {
827 self.channels_with_admin_privileges
828 .insert(permission.channel_id);
829 } else {
830 self.channels_with_admin_privileges
831 .remove(&permission.channel_id);
832 }
833 }
834
835 cx.notify();
836 if payload.channel_participants.is_empty() {
837 return None;
838 }
839
840 let mut all_user_ids = Vec::new();
841 let channel_participants = payload.channel_participants;
842 for entry in &channel_participants {
843 for user_id in entry.participant_user_ids.iter() {
844 if let Err(ix) = all_user_ids.binary_search(user_id) {
845 all_user_ids.insert(ix, *user_id);
846 }
847 }
848 }
849
850 let users = self
851 .user_store
852 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
853 Some(cx.spawn(|this, mut cx| async move {
854 let users = users.await?;
855
856 this.update(&mut cx, |this, cx| {
857 for entry in &channel_participants {
858 let mut participants: Vec<_> = entry
859 .participant_user_ids
860 .iter()
861 .filter_map(|user_id| {
862 users
863 .binary_search_by_key(&user_id, |user| &user.id)
864 .ok()
865 .map(|ix| users[ix].clone())
866 })
867 .collect();
868
869 participants.sort_by_key(|u| u.id);
870
871 this.channel_participants
872 .insert(entry.channel_id, participants);
873 }
874
875 cx.notify();
876 });
877 anyhow::Ok(())
878 }))
879 }
880}
881
882impl Deref for ChannelPath {
883 type Target = [ChannelId];
884
885 fn deref(&self) -> &Self::Target {
886 &self.0
887 }
888}
889
890impl ChannelPath {
891 pub fn new(path: Arc<[ChannelId]>) -> Self {
892 debug_assert!(path.len() >= 1);
893 Self(path)
894 }
895
896 pub fn parent_id(&self) -> Option<ChannelId> {
897 self.0.len().checked_sub(2).map(|i| self.0[i])
898 }
899
900 pub fn channel_id(&self) -> ChannelId {
901 self.0[self.0.len() - 1]
902 }
903}
904
905impl From<ChannelPath> for Cow<'static, ChannelPath> {
906 fn from(value: ChannelPath) -> Self {
907 Cow::Owned(value)
908 }
909}
910
911impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
912 fn from(value: &'a ChannelPath) -> Self {
913 Cow::Borrowed(value)
914 }
915}
916
917impl Default for ChannelPath {
918 fn default() -> Self {
919 ChannelPath(Arc::from([]))
920 }
921}