1mod channel_index;
2
3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
4use anyhow::{anyhow, Result};
5use client::{Client, Subscription, User, UserId, UserStore};
6use collections::{
7 hash_map::{self, DefaultHasher},
8 HashMap, HashSet,
9};
10use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
11use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
12use rpc::{
13 proto::{self, ChannelEdge, ChannelPermission},
14 TypedEnvelope,
15};
16use serde_derive::{Deserialize, Serialize};
17use std::{
18 borrow::Cow,
19 hash::{Hash, Hasher},
20 mem,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25use util::ResultExt;
26
27use self::channel_index::ChannelIndex;
28
29pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
30
31pub type ChannelId = u64;
32
33pub struct ChannelStore {
34 channel_index: ChannelIndex,
35 channel_invitations: Vec<Arc<Channel>>,
36 channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
37 channels_with_admin_privileges: HashSet<ChannelId>,
38 outgoing_invites: HashSet<(ChannelId, UserId)>,
39 update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
40 opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
41 opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
42 client: Arc<Client>,
43 user_store: ModelHandle<UserStore>,
44 _rpc_subscription: Subscription,
45 _watch_connection_status: Task<Option<()>>,
46 disconnect_channel_buffers_task: Option<Task<()>>,
47 _update_channels: Task<()>,
48}
49
50pub type ChannelData = (Channel, ChannelPath);
51
52#[derive(Clone, Debug, PartialEq)]
53pub struct Channel {
54 pub id: ChannelId,
55 pub name: String,
56}
57
58#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
59pub struct ChannelPath(Arc<[ChannelId]>);
60
61pub struct ChannelMembership {
62 pub user: Arc<User>,
63 pub kind: proto::channel_member::Kind,
64 pub admin: bool,
65}
66
67pub enum ChannelEvent {
68 ChannelCreated(ChannelId),
69 ChannelRenamed(ChannelId),
70}
71
72impl Entity for ChannelStore {
73 type Event = ChannelEvent;
74}
75
76enum OpenedModelHandle<E: Entity> {
77 Open(WeakModelHandle<E>),
78 Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
79}
80
81impl ChannelStore {
82 pub fn new(
83 client: Arc<Client>,
84 user_store: ModelHandle<UserStore>,
85 cx: &mut ModelContext<Self>,
86 ) -> Self {
87 let rpc_subscription =
88 client.add_message_handler(cx.handle(), Self::handle_update_channels);
89
90 let mut connection_status = client.status();
91 let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
92 let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
93 while let Some(status) = connection_status.next().await {
94 let this = this.upgrade(&cx)?;
95 if status.is_connected() {
96 this.update(&mut cx, |this, cx| this.handle_connect(cx))
97 .await
98 .log_err()?;
99 } else {
100 this.update(&mut cx, |this, cx| this.handle_disconnect(cx));
101 }
102 }
103 Some(())
104 });
105
106 Self {
107 channel_invitations: Vec::default(),
108 channel_index: ChannelIndex::default(),
109 channel_participants: Default::default(),
110 channels_with_admin_privileges: Default::default(),
111 outgoing_invites: Default::default(),
112 opened_buffers: Default::default(),
113 opened_chats: Default::default(),
114 update_channels_tx,
115 client,
116 user_store,
117 _rpc_subscription: rpc_subscription,
118 _watch_connection_status: watch_connection_status,
119 disconnect_channel_buffers_task: None,
120 _update_channels: cx.spawn_weak(|this, mut cx| async move {
121 while let Some(update_channels) = update_channels_rx.next().await {
122 if let Some(this) = this.upgrade(&cx) {
123 let update_task = this.update(&mut cx, |this, cx| {
124 this.update_channels(update_channels, cx)
125 });
126 if let Some(update_task) = update_task {
127 update_task.await.log_err();
128 }
129 }
130 }
131 }),
132 }
133 }
134
135 pub fn client(&self) -> Arc<Client> {
136 self.client.clone()
137 }
138
139 pub fn has_children(&self, channel_id: ChannelId) -> bool {
140 self.channel_index.iter().any(|path| {
141 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
142 path.len() > ix + 1
143 } else {
144 false
145 }
146 })
147 }
148
149 /// Returns the number of unique channels in the store
150 pub fn channel_count(&self) -> usize {
151 self.channel_index.by_id().len()
152 }
153
154 /// Returns the index of a channel ID in the list of unique channels
155 pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
156 self.channel_index
157 .by_id()
158 .keys()
159 .position(|id| *id == channel_id)
160 }
161
162 /// Returns an iterator over all unique channels
163 pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
164 self.channel_index.by_id().values()
165 }
166
167 /// Iterate over all entries in the channel DAG
168 pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
169 self.channel_index.iter().map(move |path| {
170 let id = path.last().unwrap();
171 let channel = self.channel_for_id(*id).unwrap();
172 (path.len() - 1, channel)
173 })
174 }
175
176 pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
177 let path = self.channel_index.get(ix)?;
178 let id = path.last().unwrap();
179 let channel = self.channel_for_id(*id).unwrap();
180
181 Some((channel, path))
182 }
183
184 pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
185 self.channel_index.by_id().values().nth(ix)
186 }
187
188 pub fn channel_invitations(&self) -> &[Arc<Channel>] {
189 &self.channel_invitations
190 }
191
192 pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
193 self.channel_index.by_id().get(&channel_id)
194 }
195
196 pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
197 if let Some(buffer) = self.opened_buffers.get(&channel_id) {
198 if let OpenedModelHandle::Open(buffer) = buffer {
199 return buffer.upgrade(cx).is_some();
200 }
201 }
202 false
203 }
204
205 pub fn open_channel_buffer(
206 &mut self,
207 channel_id: ChannelId,
208 cx: &mut ModelContext<Self>,
209 ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
210 let client = self.client.clone();
211 self.open_channel_resource(
212 channel_id,
213 |this| &mut this.opened_buffers,
214 |channel, cx| ChannelBuffer::new(channel, client, cx),
215 cx,
216 )
217 }
218
219 pub fn open_channel_chat(
220 &mut self,
221 channel_id: ChannelId,
222 cx: &mut ModelContext<Self>,
223 ) -> Task<Result<ModelHandle<ChannelChat>>> {
224 let client = self.client.clone();
225 let user_store = self.user_store.clone();
226 self.open_channel_resource(
227 channel_id,
228 |this| &mut this.opened_chats,
229 |channel, cx| ChannelChat::new(channel, user_store, client, cx),
230 cx,
231 )
232 }
233
234 /// Asynchronously open a given resource associated with a channel.
235 ///
236 /// Make sure that the resource is only opened once, even if this method
237 /// is called multiple times with the same channel id while the first task
238 /// is still running.
239 fn open_channel_resource<T: Entity, F, Fut>(
240 &mut self,
241 channel_id: ChannelId,
242 get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
243 load: F,
244 cx: &mut ModelContext<Self>,
245 ) -> Task<Result<ModelHandle<T>>>
246 where
247 F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
248 Fut: Future<Output = Result<ModelHandle<T>>>,
249 {
250 let task = loop {
251 match get_map(self).entry(channel_id) {
252 hash_map::Entry::Occupied(e) => match e.get() {
253 OpenedModelHandle::Open(model) => {
254 if let Some(model) = model.upgrade(cx) {
255 break Task::ready(Ok(model)).shared();
256 } else {
257 get_map(self).remove(&channel_id);
258 continue;
259 }
260 }
261 OpenedModelHandle::Loading(task) => {
262 break task.clone();
263 }
264 },
265 hash_map::Entry::Vacant(e) => {
266 let task = cx
267 .spawn(|this, cx| async move {
268 let channel = this.read_with(&cx, |this, _| {
269 this.channel_for_id(channel_id).cloned().ok_or_else(|| {
270 Arc::new(anyhow!("no channel for id: {}", channel_id))
271 })
272 })?;
273
274 load(channel, cx).await.map_err(Arc::new)
275 })
276 .shared();
277
278 e.insert(OpenedModelHandle::Loading(task.clone()));
279 cx.spawn({
280 let task = task.clone();
281 |this, mut cx| async move {
282 let result = task.await;
283 this.update(&mut cx, |this, _| match result {
284 Ok(model) => {
285 get_map(this).insert(
286 channel_id,
287 OpenedModelHandle::Open(model.downgrade()),
288 );
289 }
290 Err(_) => {
291 get_map(this).remove(&channel_id);
292 }
293 });
294 }
295 })
296 .detach();
297 break task;
298 }
299 }
300 };
301 cx.foreground()
302 .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
303 }
304
305 pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
306 self.channel_index.iter().any(|path| {
307 if let Some(ix) = path.iter().position(|id| *id == channel_id) {
308 path[..=ix]
309 .iter()
310 .any(|id| self.channels_with_admin_privileges.contains(id))
311 } else {
312 false
313 }
314 })
315 }
316
317 pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
318 self.channel_participants
319 .get(&channel_id)
320 .map_or(&[], |v| v.as_slice())
321 }
322
323 pub fn create_channel(
324 &self,
325 name: &str,
326 parent_id: Option<ChannelId>,
327 cx: &mut ModelContext<Self>,
328 ) -> Task<Result<ChannelId>> {
329 let client = self.client.clone();
330 let name = name.trim_start_matches("#").to_owned();
331 cx.spawn(|this, mut cx| async move {
332 let response = client
333 .request(proto::CreateChannel { name, parent_id })
334 .await?;
335
336 let channel = response
337 .channel
338 .ok_or_else(|| anyhow!("missing channel in response"))?;
339 let channel_id = channel.id;
340
341 let parent_edge = if let Some(parent_id) = parent_id {
342 vec![ChannelEdge {
343 channel_id: channel.id,
344 parent_id,
345 }]
346 } else {
347 vec![]
348 };
349
350 this.update(&mut cx, |this, cx| {
351 let task = this.update_channels(
352 proto::UpdateChannels {
353 channels: vec![channel],
354 insert_edge: parent_edge,
355 channel_permissions: vec![ChannelPermission {
356 channel_id,
357 is_admin: true,
358 }],
359 ..Default::default()
360 },
361 cx,
362 );
363 assert!(task.is_none());
364
365 // This event is emitted because the collab panel wants to clear the pending edit state
366 // before this frame is rendered. But we can't guarantee that the collab panel's future
367 // will resolve before this flush_effects finishes. Synchronously emitting this event
368 // ensures that the collab panel will observe this creation before the frame completes
369 cx.emit(ChannelEvent::ChannelCreated(channel_id));
370 });
371
372 Ok(channel_id)
373 })
374 }
375
376 pub fn link_channel(
377 &mut self,
378 channel_id: ChannelId,
379 to: ChannelId,
380 cx: &mut ModelContext<Self>,
381 ) -> Task<Result<()>> {
382 let client = self.client.clone();
383 cx.spawn(|_, _| async move {
384 let _ = client
385 .request(proto::LinkChannel { channel_id, to })
386 .await?;
387
388 Ok(())
389 })
390 }
391
392 pub fn unlink_channel(
393 &mut self,
394 channel_id: ChannelId,
395 from: ChannelId,
396 cx: &mut ModelContext<Self>,
397 ) -> Task<Result<()>> {
398 let client = self.client.clone();
399 cx.spawn(|_, _| async move {
400 let _ = client
401 .request(proto::UnlinkChannel { channel_id, from })
402 .await?;
403
404 Ok(())
405 })
406 }
407
408 pub fn move_channel(
409 &mut self,
410 channel_id: ChannelId,
411 from: ChannelId,
412 to: ChannelId,
413 cx: &mut ModelContext<Self>,
414 ) -> Task<Result<()>> {
415 let client = self.client.clone();
416 cx.spawn(|_, _| async move {
417 let _ = client
418 .request(proto::MoveChannel {
419 channel_id,
420 from,
421 to,
422 })
423 .await?;
424
425 Ok(())
426 })
427 }
428
429 pub fn invite_member(
430 &mut self,
431 channel_id: ChannelId,
432 user_id: UserId,
433 admin: bool,
434 cx: &mut ModelContext<Self>,
435 ) -> Task<Result<()>> {
436 if !self.outgoing_invites.insert((channel_id, user_id)) {
437 return Task::ready(Err(anyhow!("invite request already in progress")));
438 }
439
440 cx.notify();
441 let client = self.client.clone();
442 cx.spawn(|this, mut cx| async move {
443 let result = client
444 .request(proto::InviteChannelMember {
445 channel_id,
446 user_id,
447 admin,
448 })
449 .await;
450
451 this.update(&mut cx, |this, cx| {
452 this.outgoing_invites.remove(&(channel_id, user_id));
453 cx.notify();
454 });
455
456 result?;
457
458 Ok(())
459 })
460 }
461
462 pub fn remove_member(
463 &mut self,
464 channel_id: ChannelId,
465 user_id: u64,
466 cx: &mut ModelContext<Self>,
467 ) -> Task<Result<()>> {
468 if !self.outgoing_invites.insert((channel_id, user_id)) {
469 return Task::ready(Err(anyhow!("invite request already in progress")));
470 }
471
472 cx.notify();
473 let client = self.client.clone();
474 cx.spawn(|this, mut cx| async move {
475 let result = client
476 .request(proto::RemoveChannelMember {
477 channel_id,
478 user_id,
479 })
480 .await;
481
482 this.update(&mut cx, |this, cx| {
483 this.outgoing_invites.remove(&(channel_id, user_id));
484 cx.notify();
485 });
486 result?;
487 Ok(())
488 })
489 }
490
491 pub fn set_member_admin(
492 &mut self,
493 channel_id: ChannelId,
494 user_id: UserId,
495 admin: bool,
496 cx: &mut ModelContext<Self>,
497 ) -> Task<Result<()>> {
498 if !self.outgoing_invites.insert((channel_id, user_id)) {
499 return Task::ready(Err(anyhow!("member request already in progress")));
500 }
501
502 cx.notify();
503 let client = self.client.clone();
504 cx.spawn(|this, mut cx| async move {
505 let result = client
506 .request(proto::SetChannelMemberAdmin {
507 channel_id,
508 user_id,
509 admin,
510 })
511 .await;
512
513 this.update(&mut cx, |this, cx| {
514 this.outgoing_invites.remove(&(channel_id, user_id));
515 cx.notify();
516 });
517
518 result?;
519 Ok(())
520 })
521 }
522
523 pub fn rename(
524 &mut self,
525 channel_id: ChannelId,
526 new_name: &str,
527 cx: &mut ModelContext<Self>,
528 ) -> Task<Result<()>> {
529 let client = self.client.clone();
530 let name = new_name.to_string();
531 cx.spawn(|this, mut cx| async move {
532 let channel = client
533 .request(proto::RenameChannel { channel_id, name })
534 .await?
535 .channel
536 .ok_or_else(|| anyhow!("missing channel in response"))?;
537 this.update(&mut cx, |this, cx| {
538 let task = this.update_channels(
539 proto::UpdateChannels {
540 channels: vec![channel],
541 ..Default::default()
542 },
543 cx,
544 );
545 assert!(task.is_none());
546
547 // This event is emitted because the collab panel wants to clear the pending edit state
548 // before this frame is rendered. But we can't guarantee that the collab panel's future
549 // will resolve before this flush_effects finishes. Synchronously emitting this event
550 // ensures that the collab panel will observe this creation before the frame complete
551 cx.emit(ChannelEvent::ChannelRenamed(channel_id))
552 });
553 Ok(())
554 })
555 }
556
557 pub fn respond_to_channel_invite(
558 &mut self,
559 channel_id: ChannelId,
560 accept: bool,
561 ) -> impl Future<Output = Result<()>> {
562 let client = self.client.clone();
563 async move {
564 client
565 .request(proto::RespondToChannelInvite { channel_id, accept })
566 .await?;
567 Ok(())
568 }
569 }
570
571 pub fn get_channel_member_details(
572 &self,
573 channel_id: ChannelId,
574 cx: &mut ModelContext<Self>,
575 ) -> Task<Result<Vec<ChannelMembership>>> {
576 let client = self.client.clone();
577 let user_store = self.user_store.downgrade();
578 cx.spawn(|_, mut cx| async move {
579 let response = client
580 .request(proto::GetChannelMembers { channel_id })
581 .await?;
582
583 let user_ids = response.members.iter().map(|m| m.user_id).collect();
584 let user_store = user_store
585 .upgrade(&cx)
586 .ok_or_else(|| anyhow!("user store dropped"))?;
587 let users = user_store
588 .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
589 .await?;
590
591 Ok(users
592 .into_iter()
593 .zip(response.members)
594 .filter_map(|(user, member)| {
595 Some(ChannelMembership {
596 user,
597 admin: member.admin,
598 kind: proto::channel_member::Kind::from_i32(member.kind)?,
599 })
600 })
601 .collect())
602 })
603 }
604
605 pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
606 let client = self.client.clone();
607 async move {
608 client.request(proto::DeleteChannel { channel_id }).await?;
609 Ok(())
610 }
611 }
612
613 pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
614 false
615 }
616
617 pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
618 self.outgoing_invites.contains(&(channel_id, user_id))
619 }
620
621 async fn handle_update_channels(
622 this: ModelHandle<Self>,
623 message: TypedEnvelope<proto::UpdateChannels>,
624 _: Arc<Client>,
625 mut cx: AsyncAppContext,
626 ) -> Result<()> {
627 this.update(&mut cx, |this, _| {
628 this.update_channels_tx
629 .unbounded_send(message.payload)
630 .unwrap();
631 });
632 Ok(())
633 }
634
635 fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
636 self.disconnect_channel_buffers_task.take();
637
638 for chat in self.opened_chats.values() {
639 if let OpenedModelHandle::Open(chat) = chat {
640 if let Some(chat) = chat.upgrade(cx) {
641 chat.update(cx, |chat, cx| {
642 chat.rejoin(cx);
643 });
644 }
645 }
646 }
647
648 let mut buffer_versions = Vec::new();
649 for buffer in self.opened_buffers.values() {
650 if let OpenedModelHandle::Open(buffer) = buffer {
651 if let Some(buffer) = buffer.upgrade(cx) {
652 let channel_buffer = buffer.read(cx);
653 let buffer = channel_buffer.buffer().read(cx);
654 buffer_versions.push(proto::ChannelBufferVersion {
655 channel_id: channel_buffer.channel().id,
656 epoch: channel_buffer.epoch(),
657 version: language::proto::serialize_version(&buffer.version()),
658 });
659 }
660 }
661 }
662
663 if buffer_versions.is_empty() {
664 return Task::ready(Ok(()));
665 }
666
667 let response = self.client.request(proto::RejoinChannelBuffers {
668 buffers: buffer_versions,
669 });
670
671 cx.spawn(|this, mut cx| async move {
672 let mut response = response.await?;
673
674 this.update(&mut cx, |this, cx| {
675 this.opened_buffers.retain(|_, buffer| match buffer {
676 OpenedModelHandle::Open(channel_buffer) => {
677 let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
678 return false;
679 };
680
681 channel_buffer.update(cx, |channel_buffer, cx| {
682 let channel_id = channel_buffer.channel().id;
683 if let Some(remote_buffer) = response
684 .buffers
685 .iter_mut()
686 .find(|buffer| buffer.channel_id == channel_id)
687 {
688 let channel_id = channel_buffer.channel().id;
689 let remote_version =
690 language::proto::deserialize_version(&remote_buffer.version);
691
692 channel_buffer.replace_collaborators(
693 mem::take(&mut remote_buffer.collaborators),
694 cx,
695 );
696
697 let operations = channel_buffer
698 .buffer()
699 .update(cx, |buffer, cx| {
700 let outgoing_operations =
701 buffer.serialize_ops(Some(remote_version), cx);
702 let incoming_operations =
703 mem::take(&mut remote_buffer.operations)
704 .into_iter()
705 .map(language::proto::deserialize_operation)
706 .collect::<Result<Vec<_>>>()?;
707 buffer.apply_ops(incoming_operations, cx)?;
708 anyhow::Ok(outgoing_operations)
709 })
710 .log_err();
711
712 if let Some(operations) = operations {
713 let client = this.client.clone();
714 cx.background()
715 .spawn(async move {
716 let operations = operations.await;
717 for chunk in
718 language::proto::split_operations(operations)
719 {
720 client
721 .send(proto::UpdateChannelBuffer {
722 channel_id,
723 operations: chunk,
724 })
725 .ok();
726 }
727 })
728 .detach();
729 return true;
730 }
731 }
732
733 channel_buffer.disconnect(cx);
734 false
735 })
736 }
737 OpenedModelHandle::Loading(_) => true,
738 });
739 });
740 anyhow::Ok(())
741 })
742 }
743
744 fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
745 self.channel_index.clear();
746 self.channel_invitations.clear();
747 self.channel_participants.clear();
748 self.channels_with_admin_privileges.clear();
749 self.channel_index.clear();
750 self.outgoing_invites.clear();
751 cx.notify();
752
753 self.disconnect_channel_buffers_task.get_or_insert_with(|| {
754 cx.spawn_weak(|this, mut cx| async move {
755 cx.background().timer(RECONNECT_TIMEOUT).await;
756 if let Some(this) = this.upgrade(&cx) {
757 this.update(&mut cx, |this, cx| {
758 for (_, buffer) in this.opened_buffers.drain() {
759 if let OpenedModelHandle::Open(buffer) = buffer {
760 if let Some(buffer) = buffer.upgrade(cx) {
761 buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
762 }
763 }
764 }
765 });
766 }
767 })
768 });
769 }
770
771 pub(crate) fn update_channels(
772 &mut self,
773 payload: proto::UpdateChannels,
774 cx: &mut ModelContext<ChannelStore>,
775 ) -> Option<Task<Result<()>>> {
776 if !payload.remove_channel_invitations.is_empty() {
777 self.channel_invitations
778 .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
779 }
780 for channel in payload.channel_invitations {
781 match self
782 .channel_invitations
783 .binary_search_by_key(&channel.id, |c| c.id)
784 {
785 Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
786 Err(ix) => self.channel_invitations.insert(
787 ix,
788 Arc::new(Channel {
789 id: channel.id,
790 name: channel.name,
791 }),
792 ),
793 }
794 }
795
796 let channels_changed = !payload.channels.is_empty()
797 || !payload.delete_channels.is_empty()
798 || !payload.insert_edge.is_empty()
799 || !payload.delete_edge.is_empty();
800
801 if channels_changed {
802 if !payload.delete_channels.is_empty() {
803 self.channel_index.delete_channels(&payload.delete_channels);
804 self.channel_participants
805 .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
806 self.channels_with_admin_privileges
807 .retain(|channel_id| !payload.delete_channels.contains(channel_id));
808
809 for channel_id in &payload.delete_channels {
810 let channel_id = *channel_id;
811 if let Some(OpenedModelHandle::Open(buffer)) =
812 self.opened_buffers.remove(&channel_id)
813 {
814 if let Some(buffer) = buffer.upgrade(cx) {
815 buffer.update(cx, ChannelBuffer::disconnect);
816 }
817 }
818 }
819 }
820
821 let mut index = self.channel_index.bulk_insert();
822 for channel in payload.channels {
823 index.insert(channel)
824 }
825
826 for edge in payload.insert_edge {
827 index.insert_edge(edge.channel_id, edge.parent_id);
828 }
829
830 for edge in payload.delete_edge {
831 index.delete_edge(edge.parent_id, edge.channel_id);
832 }
833 }
834
835 for permission in payload.channel_permissions {
836 if permission.is_admin {
837 self.channels_with_admin_privileges
838 .insert(permission.channel_id);
839 } else {
840 self.channels_with_admin_privileges
841 .remove(&permission.channel_id);
842 }
843 }
844
845 cx.notify();
846 if payload.channel_participants.is_empty() {
847 return None;
848 }
849
850 let mut all_user_ids = Vec::new();
851 let channel_participants = payload.channel_participants;
852 for entry in &channel_participants {
853 for user_id in entry.participant_user_ids.iter() {
854 if let Err(ix) = all_user_ids.binary_search(user_id) {
855 all_user_ids.insert(ix, *user_id);
856 }
857 }
858 }
859
860 let users = self
861 .user_store
862 .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
863 Some(cx.spawn(|this, mut cx| async move {
864 let users = users.await?;
865
866 this.update(&mut cx, |this, cx| {
867 for entry in &channel_participants {
868 let mut participants: Vec<_> = entry
869 .participant_user_ids
870 .iter()
871 .filter_map(|user_id| {
872 users
873 .binary_search_by_key(&user_id, |user| &user.id)
874 .ok()
875 .map(|ix| users[ix].clone())
876 })
877 .collect();
878
879 participants.sort_by_key(|u| u.id);
880
881 this.channel_participants
882 .insert(entry.channel_id, participants);
883 }
884
885 cx.notify();
886 });
887 anyhow::Ok(())
888 }))
889 }
890}
891
892impl Deref for ChannelPath {
893 type Target = [ChannelId];
894
895 fn deref(&self) -> &Self::Target {
896 &self.0
897 }
898}
899
900impl ChannelPath {
901 pub fn new(path: Arc<[ChannelId]>) -> Self {
902 debug_assert!(path.len() >= 1);
903 Self(path)
904 }
905
906 pub fn parent_id(&self) -> Option<ChannelId> {
907 self.0.len().checked_sub(2).map(|i| self.0[i])
908 }
909
910 pub fn channel_id(&self) -> ChannelId {
911 self.0[self.0.len() - 1]
912 }
913
914 pub fn unique_id(&self) -> u64 {
915 let mut hasher = DefaultHasher::new();
916 self.0.deref().hash(&mut hasher);
917 hasher.finish()
918 }
919}
920
921impl From<ChannelPath> for Cow<'static, ChannelPath> {
922 fn from(value: ChannelPath) -> Self {
923 Cow::Owned(value)
924 }
925}
926
927impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
928 fn from(value: &'a ChannelPath) -> Self {
929 Cow::Borrowed(value)
930 }
931}
932
933impl Default for ChannelPath {
934 fn default() -> Self {
935 ChannelPath(Arc::from([]))
936 }
937}