channel_store.rs

  1mod channel_index;
  2
  3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
  4use anyhow::{anyhow, Result};
  5use client::{Client, Subscription, User, UserId, UserStore};
  6use collections::{
  7    hash_map::{self, DefaultHasher},
  8    HashMap, HashSet,
  9};
 10use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
 11use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
 12use rpc::{
 13    proto::{self, ChannelEdge, ChannelPermission},
 14    TypedEnvelope,
 15};
 16use serde_derive::{Deserialize, Serialize};
 17use std::{
 18    borrow::Cow,
 19    hash::{Hash, Hasher},
 20    mem,
 21    ops::Deref,
 22    sync::Arc,
 23    time::Duration,
 24};
 25use util::ResultExt;
 26
 27use self::channel_index::ChannelIndex;
 28
 29pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
 30
 31pub type ChannelId = u64;
 32
 33pub struct ChannelStore {
 34    channel_index: ChannelIndex,
 35    channel_invitations: Vec<Arc<Channel>>,
 36    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
 37    channels_with_admin_privileges: HashSet<ChannelId>,
 38    outgoing_invites: HashSet<(ChannelId, UserId)>,
 39    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
 40    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
 41    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
 42    client: Arc<Client>,
 43    user_store: ModelHandle<UserStore>,
 44    _rpc_subscription: Subscription,
 45    _watch_connection_status: Task<Option<()>>,
 46    disconnect_channel_buffers_task: Option<Task<()>>,
 47    _update_channels: Task<()>,
 48}
 49
 50pub type ChannelData = (Channel, ChannelPath);
 51
 52#[derive(Clone, Debug, PartialEq)]
 53pub struct Channel {
 54    pub id: ChannelId,
 55    pub name: String,
 56}
 57
 58#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
 59pub struct ChannelPath(Arc<[ChannelId]>);
 60
 61pub struct ChannelMembership {
 62    pub user: Arc<User>,
 63    pub kind: proto::channel_member::Kind,
 64    pub admin: bool,
 65}
 66
 67pub enum ChannelEvent {
 68    ChannelCreated(ChannelId),
 69    ChannelRenamed(ChannelId),
 70}
 71
 72impl Entity for ChannelStore {
 73    type Event = ChannelEvent;
 74}
 75
 76enum OpenedModelHandle<E: Entity> {
 77    Open(WeakModelHandle<E>),
 78    Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
 79}
 80
 81impl ChannelStore {
 82    pub fn new(
 83        client: Arc<Client>,
 84        user_store: ModelHandle<UserStore>,
 85        cx: &mut ModelContext<Self>,
 86    ) -> Self {
 87        let rpc_subscription =
 88            client.add_message_handler(cx.handle(), Self::handle_update_channels);
 89
 90        let mut connection_status = client.status();
 91        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 92        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
 93            while let Some(status) = connection_status.next().await {
 94                let this = this.upgrade(&cx)?;
 95                if status.is_connected() {
 96                    this.update(&mut cx, |this, cx| this.handle_connect(cx))
 97                        .await
 98                        .log_err()?;
 99                } else {
100                    this.update(&mut cx, |this, cx| this.handle_disconnect(cx));
101                }
102            }
103            Some(())
104        });
105
106        Self {
107            channel_invitations: Vec::default(),
108            channel_index: ChannelIndex::default(),
109            channel_participants: Default::default(),
110            channels_with_admin_privileges: Default::default(),
111            outgoing_invites: Default::default(),
112            opened_buffers: Default::default(),
113            opened_chats: Default::default(),
114            update_channels_tx,
115            client,
116            user_store,
117            _rpc_subscription: rpc_subscription,
118            _watch_connection_status: watch_connection_status,
119            disconnect_channel_buffers_task: None,
120            _update_channels: cx.spawn_weak(|this, mut cx| async move {
121                while let Some(update_channels) = update_channels_rx.next().await {
122                    if let Some(this) = this.upgrade(&cx) {
123                        let update_task = this.update(&mut cx, |this, cx| {
124                            this.update_channels(update_channels, cx)
125                        });
126                        if let Some(update_task) = update_task {
127                            update_task.await.log_err();
128                        }
129                    }
130                }
131            }),
132        }
133    }
134
135    pub fn client(&self) -> Arc<Client> {
136        self.client.clone()
137    }
138
139    pub fn has_children(&self, channel_id: ChannelId) -> bool {
140        self.channel_index.iter().any(|path| {
141            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
142                path.len() > ix + 1
143            } else {
144                false
145            }
146        })
147    }
148
149    /// Returns the number of unique channels in the store
150    pub fn channel_count(&self) -> usize {
151        self.channel_index.by_id().len()
152    }
153
154    /// Returns the index of a channel ID in the list of unique channels
155    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
156        self.channel_index
157            .by_id()
158            .keys()
159            .position(|id| *id == channel_id)
160    }
161
162    /// Returns an iterator over all unique channels
163    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
164        self.channel_index.by_id().values()
165    }
166
167    /// Iterate over all entries in the channel DAG
168    pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
169        self.channel_index.iter().map(move |path| {
170            let id = path.last().unwrap();
171            let channel = self.channel_for_id(*id).unwrap();
172            (path.len() - 1, channel)
173        })
174    }
175
176    pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
177        let path = self.channel_index.get(ix)?;
178        let id = path.last().unwrap();
179        let channel = self.channel_for_id(*id).unwrap();
180
181        Some((channel, path))
182    }
183
184    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
185        self.channel_index.by_id().values().nth(ix)
186    }
187
188    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
189        &self.channel_invitations
190    }
191
192    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
193        self.channel_index.by_id().get(&channel_id)
194    }
195
196    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
197        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
198            if let OpenedModelHandle::Open(buffer) = buffer {
199                return buffer.upgrade(cx).is_some();
200            }
201        }
202        false
203    }
204
205    pub fn open_channel_buffer(
206        &mut self,
207        channel_id: ChannelId,
208        cx: &mut ModelContext<Self>,
209    ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
210        let client = self.client.clone();
211        self.open_channel_resource(
212            channel_id,
213            |this| &mut this.opened_buffers,
214            |channel, cx| ChannelBuffer::new(channel, client, cx),
215            cx,
216        )
217    }
218
219    pub fn open_channel_chat(
220        &mut self,
221        channel_id: ChannelId,
222        cx: &mut ModelContext<Self>,
223    ) -> Task<Result<ModelHandle<ChannelChat>>> {
224        let client = self.client.clone();
225        let user_store = self.user_store.clone();
226        self.open_channel_resource(
227            channel_id,
228            |this| &mut this.opened_chats,
229            |channel, cx| ChannelChat::new(channel, user_store, client, cx),
230            cx,
231        )
232    }
233
234    /// Asynchronously open a given resource associated with a channel.
235    ///
236    /// Make sure that the resource is only opened once, even if this method
237    /// is called multiple times with the same channel id while the first task
238    /// is still running.
239    fn open_channel_resource<T: Entity, F, Fut>(
240        &mut self,
241        channel_id: ChannelId,
242        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
243        load: F,
244        cx: &mut ModelContext<Self>,
245    ) -> Task<Result<ModelHandle<T>>>
246    where
247        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
248        Fut: Future<Output = Result<ModelHandle<T>>>,
249    {
250        let task = loop {
251            match get_map(self).entry(channel_id) {
252                hash_map::Entry::Occupied(e) => match e.get() {
253                    OpenedModelHandle::Open(model) => {
254                        if let Some(model) = model.upgrade(cx) {
255                            break Task::ready(Ok(model)).shared();
256                        } else {
257                            get_map(self).remove(&channel_id);
258                            continue;
259                        }
260                    }
261                    OpenedModelHandle::Loading(task) => {
262                        break task.clone();
263                    }
264                },
265                hash_map::Entry::Vacant(e) => {
266                    let task = cx
267                        .spawn(|this, cx| async move {
268                            let channel = this.read_with(&cx, |this, _| {
269                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
270                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
271                                })
272                            })?;
273
274                            load(channel, cx).await.map_err(Arc::new)
275                        })
276                        .shared();
277
278                    e.insert(OpenedModelHandle::Loading(task.clone()));
279                    cx.spawn({
280                        let task = task.clone();
281                        |this, mut cx| async move {
282                            let result = task.await;
283                            this.update(&mut cx, |this, _| match result {
284                                Ok(model) => {
285                                    get_map(this).insert(
286                                        channel_id,
287                                        OpenedModelHandle::Open(model.downgrade()),
288                                    );
289                                }
290                                Err(_) => {
291                                    get_map(this).remove(&channel_id);
292                                }
293                            });
294                        }
295                    })
296                    .detach();
297                    break task;
298                }
299            }
300        };
301        cx.foreground()
302            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
303    }
304
305    pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
306        self.channel_index.iter().any(|path| {
307            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
308                path[..=ix]
309                    .iter()
310                    .any(|id| self.channels_with_admin_privileges.contains(id))
311            } else {
312                false
313            }
314        })
315    }
316
317    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
318        self.channel_participants
319            .get(&channel_id)
320            .map_or(&[], |v| v.as_slice())
321    }
322
323    pub fn create_channel(
324        &self,
325        name: &str,
326        parent_id: Option<ChannelId>,
327        cx: &mut ModelContext<Self>,
328    ) -> Task<Result<ChannelId>> {
329        let client = self.client.clone();
330        let name = name.trim_start_matches("#").to_owned();
331        cx.spawn(|this, mut cx| async move {
332            let response = client
333                .request(proto::CreateChannel { name, parent_id })
334                .await?;
335
336            let channel = response
337                .channel
338                .ok_or_else(|| anyhow!("missing channel in response"))?;
339            let channel_id = channel.id;
340
341            let parent_edge = if let Some(parent_id) = parent_id {
342                vec![ChannelEdge {
343                    channel_id: channel.id,
344                    parent_id,
345                }]
346            } else {
347                vec![]
348            };
349
350            this.update(&mut cx, |this, cx| {
351                let task = this.update_channels(
352                    proto::UpdateChannels {
353                        channels: vec![channel],
354                        insert_edge: parent_edge,
355                        channel_permissions: vec![ChannelPermission {
356                            channel_id,
357                            is_admin: true,
358                        }],
359                        ..Default::default()
360                    },
361                    cx,
362                );
363                assert!(task.is_none());
364
365                // This event is emitted because the collab panel wants to clear the pending edit state
366                // before this frame is rendered. But we can't guarantee that the collab panel's future
367                // will resolve before this flush_effects finishes. Synchronously emitting this event
368                // ensures that the collab panel will observe this creation before the frame completes
369                cx.emit(ChannelEvent::ChannelCreated(channel_id));
370            });
371
372            Ok(channel_id)
373        })
374    }
375
376    pub fn link_channel(
377        &mut self,
378        channel_id: ChannelId,
379        to: ChannelId,
380        cx: &mut ModelContext<Self>,
381    ) -> Task<Result<()>> {
382        let client = self.client.clone();
383        cx.spawn(|_, _| async move {
384            let _ = client
385                .request(proto::LinkChannel { channel_id, to })
386                .await?;
387
388            Ok(())
389        })
390    }
391
392    pub fn unlink_channel(
393        &mut self,
394        channel_id: ChannelId,
395        from: ChannelId,
396        cx: &mut ModelContext<Self>,
397    ) -> Task<Result<()>> {
398        let client = self.client.clone();
399        cx.spawn(|_, _| async move {
400            let _ = client
401                .request(proto::UnlinkChannel { channel_id, from })
402                .await?;
403
404            Ok(())
405        })
406    }
407
408    pub fn move_channel(
409        &mut self,
410        channel_id: ChannelId,
411        from: ChannelId,
412        to: ChannelId,
413        cx: &mut ModelContext<Self>,
414    ) -> Task<Result<()>> {
415        let client = self.client.clone();
416        cx.spawn(|_, _| async move {
417            let _ = client
418                .request(proto::MoveChannel {
419                    channel_id,
420                    from,
421                    to,
422                })
423                .await?;
424
425            Ok(())
426        })
427    }
428
429    pub fn invite_member(
430        &mut self,
431        channel_id: ChannelId,
432        user_id: UserId,
433        admin: bool,
434        cx: &mut ModelContext<Self>,
435    ) -> Task<Result<()>> {
436        if !self.outgoing_invites.insert((channel_id, user_id)) {
437            return Task::ready(Err(anyhow!("invite request already in progress")));
438        }
439
440        cx.notify();
441        let client = self.client.clone();
442        cx.spawn(|this, mut cx| async move {
443            let result = client
444                .request(proto::InviteChannelMember {
445                    channel_id,
446                    user_id,
447                    admin,
448                })
449                .await;
450
451            this.update(&mut cx, |this, cx| {
452                this.outgoing_invites.remove(&(channel_id, user_id));
453                cx.notify();
454            });
455
456            result?;
457
458            Ok(())
459        })
460    }
461
462    pub fn remove_member(
463        &mut self,
464        channel_id: ChannelId,
465        user_id: u64,
466        cx: &mut ModelContext<Self>,
467    ) -> Task<Result<()>> {
468        if !self.outgoing_invites.insert((channel_id, user_id)) {
469            return Task::ready(Err(anyhow!("invite request already in progress")));
470        }
471
472        cx.notify();
473        let client = self.client.clone();
474        cx.spawn(|this, mut cx| async move {
475            let result = client
476                .request(proto::RemoveChannelMember {
477                    channel_id,
478                    user_id,
479                })
480                .await;
481
482            this.update(&mut cx, |this, cx| {
483                this.outgoing_invites.remove(&(channel_id, user_id));
484                cx.notify();
485            });
486            result?;
487            Ok(())
488        })
489    }
490
491    pub fn set_member_admin(
492        &mut self,
493        channel_id: ChannelId,
494        user_id: UserId,
495        admin: bool,
496        cx: &mut ModelContext<Self>,
497    ) -> Task<Result<()>> {
498        if !self.outgoing_invites.insert((channel_id, user_id)) {
499            return Task::ready(Err(anyhow!("member request already in progress")));
500        }
501
502        cx.notify();
503        let client = self.client.clone();
504        cx.spawn(|this, mut cx| async move {
505            let result = client
506                .request(proto::SetChannelMemberAdmin {
507                    channel_id,
508                    user_id,
509                    admin,
510                })
511                .await;
512
513            this.update(&mut cx, |this, cx| {
514                this.outgoing_invites.remove(&(channel_id, user_id));
515                cx.notify();
516            });
517
518            result?;
519            Ok(())
520        })
521    }
522
523    pub fn rename(
524        &mut self,
525        channel_id: ChannelId,
526        new_name: &str,
527        cx: &mut ModelContext<Self>,
528    ) -> Task<Result<()>> {
529        let client = self.client.clone();
530        let name = new_name.to_string();
531        cx.spawn(|this, mut cx| async move {
532            let channel = client
533                .request(proto::RenameChannel { channel_id, name })
534                .await?
535                .channel
536                .ok_or_else(|| anyhow!("missing channel in response"))?;
537            this.update(&mut cx, |this, cx| {
538                let task = this.update_channels(
539                    proto::UpdateChannels {
540                        channels: vec![channel],
541                        ..Default::default()
542                    },
543                    cx,
544                );
545                assert!(task.is_none());
546
547                // This event is emitted because the collab panel wants to clear the pending edit state
548                // before this frame is rendered. But we can't guarantee that the collab panel's future
549                // will resolve before this flush_effects finishes. Synchronously emitting this event
550                // ensures that the collab panel will observe this creation before the frame complete
551                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
552            });
553            Ok(())
554        })
555    }
556
557    pub fn respond_to_channel_invite(
558        &mut self,
559        channel_id: ChannelId,
560        accept: bool,
561    ) -> impl Future<Output = Result<()>> {
562        let client = self.client.clone();
563        async move {
564            client
565                .request(proto::RespondToChannelInvite { channel_id, accept })
566                .await?;
567            Ok(())
568        }
569    }
570
571    pub fn get_channel_member_details(
572        &self,
573        channel_id: ChannelId,
574        cx: &mut ModelContext<Self>,
575    ) -> Task<Result<Vec<ChannelMembership>>> {
576        let client = self.client.clone();
577        let user_store = self.user_store.downgrade();
578        cx.spawn(|_, mut cx| async move {
579            let response = client
580                .request(proto::GetChannelMembers { channel_id })
581                .await?;
582
583            let user_ids = response.members.iter().map(|m| m.user_id).collect();
584            let user_store = user_store
585                .upgrade(&cx)
586                .ok_or_else(|| anyhow!("user store dropped"))?;
587            let users = user_store
588                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
589                .await?;
590
591            Ok(users
592                .into_iter()
593                .zip(response.members)
594                .filter_map(|(user, member)| {
595                    Some(ChannelMembership {
596                        user,
597                        admin: member.admin,
598                        kind: proto::channel_member::Kind::from_i32(member.kind)?,
599                    })
600                })
601                .collect())
602        })
603    }
604
605    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
606        let client = self.client.clone();
607        async move {
608            client.request(proto::DeleteChannel { channel_id }).await?;
609            Ok(())
610        }
611    }
612
613    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
614        false
615    }
616
617    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
618        self.outgoing_invites.contains(&(channel_id, user_id))
619    }
620
621    async fn handle_update_channels(
622        this: ModelHandle<Self>,
623        message: TypedEnvelope<proto::UpdateChannels>,
624        _: Arc<Client>,
625        mut cx: AsyncAppContext,
626    ) -> Result<()> {
627        this.update(&mut cx, |this, _| {
628            this.update_channels_tx
629                .unbounded_send(message.payload)
630                .unwrap();
631        });
632        Ok(())
633    }
634
635    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
636        self.disconnect_channel_buffers_task.take();
637
638        for chat in self.opened_chats.values() {
639            if let OpenedModelHandle::Open(chat) = chat {
640                if let Some(chat) = chat.upgrade(cx) {
641                    chat.update(cx, |chat, cx| {
642                        chat.rejoin(cx);
643                    });
644                }
645            }
646        }
647
648        let mut buffer_versions = Vec::new();
649        for buffer in self.opened_buffers.values() {
650            if let OpenedModelHandle::Open(buffer) = buffer {
651                if let Some(buffer) = buffer.upgrade(cx) {
652                    let channel_buffer = buffer.read(cx);
653                    let buffer = channel_buffer.buffer().read(cx);
654                    buffer_versions.push(proto::ChannelBufferVersion {
655                        channel_id: channel_buffer.channel().id,
656                        epoch: channel_buffer.epoch(),
657                        version: language::proto::serialize_version(&buffer.version()),
658                    });
659                }
660            }
661        }
662
663        if buffer_versions.is_empty() {
664            return Task::ready(Ok(()));
665        }
666
667        let response = self.client.request(proto::RejoinChannelBuffers {
668            buffers: buffer_versions,
669        });
670
671        cx.spawn(|this, mut cx| async move {
672            let mut response = response.await?;
673
674            this.update(&mut cx, |this, cx| {
675                this.opened_buffers.retain(|_, buffer| match buffer {
676                    OpenedModelHandle::Open(channel_buffer) => {
677                        let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
678                            return false;
679                        };
680
681                        channel_buffer.update(cx, |channel_buffer, cx| {
682                            let channel_id = channel_buffer.channel().id;
683                            if let Some(remote_buffer) = response
684                                .buffers
685                                .iter_mut()
686                                .find(|buffer| buffer.channel_id == channel_id)
687                            {
688                                let channel_id = channel_buffer.channel().id;
689                                let remote_version =
690                                    language::proto::deserialize_version(&remote_buffer.version);
691
692                                channel_buffer.replace_collaborators(
693                                    mem::take(&mut remote_buffer.collaborators),
694                                    cx,
695                                );
696
697                                let operations = channel_buffer
698                                    .buffer()
699                                    .update(cx, |buffer, cx| {
700                                        let outgoing_operations =
701                                            buffer.serialize_ops(Some(remote_version), cx);
702                                        let incoming_operations =
703                                            mem::take(&mut remote_buffer.operations)
704                                                .into_iter()
705                                                .map(language::proto::deserialize_operation)
706                                                .collect::<Result<Vec<_>>>()?;
707                                        buffer.apply_ops(incoming_operations, cx)?;
708                                        anyhow::Ok(outgoing_operations)
709                                    })
710                                    .log_err();
711
712                                if let Some(operations) = operations {
713                                    let client = this.client.clone();
714                                    cx.background()
715                                        .spawn(async move {
716                                            let operations = operations.await;
717                                            for chunk in
718                                                language::proto::split_operations(operations)
719                                            {
720                                                client
721                                                    .send(proto::UpdateChannelBuffer {
722                                                        channel_id,
723                                                        operations: chunk,
724                                                    })
725                                                    .ok();
726                                            }
727                                        })
728                                        .detach();
729                                    return true;
730                                }
731                            }
732
733                            channel_buffer.disconnect(cx);
734                            false
735                        })
736                    }
737                    OpenedModelHandle::Loading(_) => true,
738                });
739            });
740            anyhow::Ok(())
741        })
742    }
743
744    fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
745        self.channel_index.clear();
746        self.channel_invitations.clear();
747        self.channel_participants.clear();
748        self.channels_with_admin_privileges.clear();
749        self.channel_index.clear();
750        self.outgoing_invites.clear();
751        cx.notify();
752
753        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
754            cx.spawn_weak(|this, mut cx| async move {
755                cx.background().timer(RECONNECT_TIMEOUT).await;
756                if let Some(this) = this.upgrade(&cx) {
757                    this.update(&mut cx, |this, cx| {
758                        for (_, buffer) in this.opened_buffers.drain() {
759                            if let OpenedModelHandle::Open(buffer) = buffer {
760                                if let Some(buffer) = buffer.upgrade(cx) {
761                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
762                                }
763                            }
764                        }
765                    });
766                }
767            })
768        });
769    }
770
771    pub(crate) fn update_channels(
772        &mut self,
773        payload: proto::UpdateChannels,
774        cx: &mut ModelContext<ChannelStore>,
775    ) -> Option<Task<Result<()>>> {
776        if !payload.remove_channel_invitations.is_empty() {
777            self.channel_invitations
778                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
779        }
780        for channel in payload.channel_invitations {
781            match self
782                .channel_invitations
783                .binary_search_by_key(&channel.id, |c| c.id)
784            {
785                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
786                Err(ix) => self.channel_invitations.insert(
787                    ix,
788                    Arc::new(Channel {
789                        id: channel.id,
790                        name: channel.name,
791                    }),
792                ),
793            }
794        }
795
796        let channels_changed = !payload.channels.is_empty()
797            || !payload.delete_channels.is_empty()
798            || !payload.insert_edge.is_empty()
799            || !payload.delete_edge.is_empty();
800
801        if channels_changed {
802            if !payload.delete_channels.is_empty() {
803                self.channel_index.delete_channels(&payload.delete_channels);
804                self.channel_participants
805                    .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
806                self.channels_with_admin_privileges
807                    .retain(|channel_id| !payload.delete_channels.contains(channel_id));
808
809                for channel_id in &payload.delete_channels {
810                    let channel_id = *channel_id;
811                    if let Some(OpenedModelHandle::Open(buffer)) =
812                        self.opened_buffers.remove(&channel_id)
813                    {
814                        if let Some(buffer) = buffer.upgrade(cx) {
815                            buffer.update(cx, ChannelBuffer::disconnect);
816                        }
817                    }
818                }
819            }
820
821            let mut index = self.channel_index.bulk_insert();
822            for channel in payload.channels {
823                index.insert(channel)
824            }
825
826            for edge in payload.insert_edge {
827                index.insert_edge(edge.channel_id, edge.parent_id);
828            }
829
830            for edge in payload.delete_edge {
831                index.delete_edge(edge.parent_id, edge.channel_id);
832            }
833        }
834
835        for permission in payload.channel_permissions {
836            if permission.is_admin {
837                self.channels_with_admin_privileges
838                    .insert(permission.channel_id);
839            } else {
840                self.channels_with_admin_privileges
841                    .remove(&permission.channel_id);
842            }
843        }
844
845        cx.notify();
846        if payload.channel_participants.is_empty() {
847            return None;
848        }
849
850        let mut all_user_ids = Vec::new();
851        let channel_participants = payload.channel_participants;
852        for entry in &channel_participants {
853            for user_id in entry.participant_user_ids.iter() {
854                if let Err(ix) = all_user_ids.binary_search(user_id) {
855                    all_user_ids.insert(ix, *user_id);
856                }
857            }
858        }
859
860        let users = self
861            .user_store
862            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
863        Some(cx.spawn(|this, mut cx| async move {
864            let users = users.await?;
865
866            this.update(&mut cx, |this, cx| {
867                for entry in &channel_participants {
868                    let mut participants: Vec<_> = entry
869                        .participant_user_ids
870                        .iter()
871                        .filter_map(|user_id| {
872                            users
873                                .binary_search_by_key(&user_id, |user| &user.id)
874                                .ok()
875                                .map(|ix| users[ix].clone())
876                        })
877                        .collect();
878
879                    participants.sort_by_key(|u| u.id);
880
881                    this.channel_participants
882                        .insert(entry.channel_id, participants);
883                }
884
885                cx.notify();
886            });
887            anyhow::Ok(())
888        }))
889    }
890}
891
892impl Deref for ChannelPath {
893    type Target = [ChannelId];
894
895    fn deref(&self) -> &Self::Target {
896        &self.0
897    }
898}
899
900impl ChannelPath {
901    pub fn new(path: Arc<[ChannelId]>) -> Self {
902        debug_assert!(path.len() >= 1);
903        Self(path)
904    }
905
906    pub fn parent_id(&self) -> Option<ChannelId> {
907        self.0.len().checked_sub(2).map(|i| self.0[i])
908    }
909
910    pub fn channel_id(&self) -> ChannelId {
911        self.0[self.0.len() - 1]
912    }
913
914    pub fn unique_id(&self) -> u64 {
915        let mut hasher = DefaultHasher::new();
916        self.0.deref().hash(&mut hasher);
917        hasher.finish()
918    }
919}
920
921impl From<ChannelPath> for Cow<'static, ChannelPath> {
922    fn from(value: ChannelPath) -> Self {
923        Cow::Owned(value)
924    }
925}
926
927impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
928    fn from(value: &'a ChannelPath) -> Self {
929        Cow::Borrowed(value)
930    }
931}
932
933impl Default for ChannelPath {
934    fn default() -> Self {
935        ChannelPath(Arc::from([]))
936    }
937}