channel_store.rs

  1mod channel_index;
  2
  3use crate::{channel_buffer::ChannelBuffer, channel_chat::ChannelChat};
  4use anyhow::{anyhow, Result};
  5use client::{Client, Subscription, User, UserId, UserStore};
  6use collections::{hash_map, HashMap, HashSet};
  7use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  8use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  9use rpc::{
 10    proto::{self, ChannelEdge, ChannelPermission},
 11    TypedEnvelope,
 12};
 13use serde_derive::{Deserialize, Serialize};
 14use std::{borrow::Cow, hash::Hash, mem, ops::Deref, sync::Arc, time::Duration};
 15use util::ResultExt;
 16
 17use self::channel_index::ChannelIndex;
 18
 19pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
 20
 21pub type ChannelId = u64;
 22
 23pub struct ChannelStore {
 24    channel_index: ChannelIndex,
 25    channel_invitations: Vec<Arc<Channel>>,
 26    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
 27    channels_with_admin_privileges: HashSet<ChannelId>,
 28    outgoing_invites: HashSet<(ChannelId, UserId)>,
 29    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
 30    opened_buffers: HashMap<ChannelId, OpenedModelHandle<ChannelBuffer>>,
 31    opened_chats: HashMap<ChannelId, OpenedModelHandle<ChannelChat>>,
 32    client: Arc<Client>,
 33    user_store: ModelHandle<UserStore>,
 34    _rpc_subscription: Subscription,
 35    _watch_connection_status: Task<Option<()>>,
 36    disconnect_channel_buffers_task: Option<Task<()>>,
 37    _update_channels: Task<()>,
 38}
 39
 40pub type ChannelData = (Channel, ChannelPath);
 41
 42#[derive(Clone, Debug, PartialEq)]
 43pub struct Channel {
 44    pub id: ChannelId,
 45    pub name: String,
 46}
 47
 48#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
 49pub struct ChannelPath(Arc<[ChannelId]>);
 50
 51pub struct ChannelMembership {
 52    pub user: Arc<User>,
 53    pub kind: proto::channel_member::Kind,
 54    pub admin: bool,
 55}
 56
 57pub enum ChannelEvent {
 58    ChannelCreated(ChannelId),
 59    ChannelRenamed(ChannelId),
 60}
 61
 62impl Entity for ChannelStore {
 63    type Event = ChannelEvent;
 64}
 65
 66enum OpenedModelHandle<E: Entity> {
 67    Open(WeakModelHandle<E>),
 68    Loading(Shared<Task<Result<ModelHandle<E>, Arc<anyhow::Error>>>>),
 69}
 70
 71impl ChannelStore {
 72    pub fn new(
 73        client: Arc<Client>,
 74        user_store: ModelHandle<UserStore>,
 75        cx: &mut ModelContext<Self>,
 76    ) -> Self {
 77        let rpc_subscription =
 78            client.add_message_handler(cx.handle(), Self::handle_update_channels);
 79
 80        let mut connection_status = client.status();
 81        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 82        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
 83            while let Some(status) = connection_status.next().await {
 84                let this = this.upgrade(&cx)?;
 85                if status.is_connected() {
 86                    this.update(&mut cx, |this, cx| this.handle_connect(cx))
 87                        .await
 88                        .log_err()?;
 89                } else {
 90                    this.update(&mut cx, |this, cx| this.handle_disconnect(cx));
 91                }
 92            }
 93            Some(())
 94        });
 95
 96        Self {
 97            channel_invitations: Vec::default(),
 98            channel_index: ChannelIndex::default(),
 99            channel_participants: Default::default(),
100            channels_with_admin_privileges: Default::default(),
101            outgoing_invites: Default::default(),
102            opened_buffers: Default::default(),
103            opened_chats: Default::default(),
104            update_channels_tx,
105            client,
106            user_store,
107            _rpc_subscription: rpc_subscription,
108            _watch_connection_status: watch_connection_status,
109            disconnect_channel_buffers_task: None,
110            _update_channels: cx.spawn_weak(|this, mut cx| async move {
111                while let Some(update_channels) = update_channels_rx.next().await {
112                    if let Some(this) = this.upgrade(&cx) {
113                        let update_task = this.update(&mut cx, |this, cx| {
114                            this.update_channels(update_channels, cx)
115                        });
116                        if let Some(update_task) = update_task {
117                            update_task.await.log_err();
118                        }
119                    }
120                }
121            }),
122        }
123    }
124
125    pub fn client(&self) -> Arc<Client> {
126        self.client.clone()
127    }
128
129    pub fn has_children(&self, channel_id: ChannelId) -> bool {
130        self.channel_index.iter().any(|path| {
131            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
132                path.len() > ix + 1
133            } else {
134                false
135            }
136        })
137    }
138
139    /// Returns the number of unique channels in the store
140    pub fn channel_count(&self) -> usize {
141        self.channel_index.by_id().len()
142    }
143
144    /// Returns the index of a channel ID in the list of unique channels
145    pub fn index_of_channel(&self, channel_id: ChannelId) -> Option<usize> {
146        self.channel_index
147            .by_id()
148            .keys()
149            .position(|id| *id == channel_id)
150    }
151
152    /// Returns an iterator over all unique channels
153    pub fn channels(&self) -> impl '_ + Iterator<Item = &Arc<Channel>> {
154        self.channel_index.by_id().values()
155    }
156
157    /// Iterate over all entries in the channel DAG
158    pub fn channel_dag_entries(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
159        self.channel_index.iter().map(move |path| {
160            let id = path.last().unwrap();
161            let channel = self.channel_for_id(*id).unwrap();
162            (path.len() - 1, channel)
163        })
164    }
165
166    pub fn channel_dag_entry_at(&self, ix: usize) -> Option<(&Arc<Channel>, &ChannelPath)> {
167        let path = self.channel_index.get(ix)?;
168        let id = path.last().unwrap();
169        let channel = self.channel_for_id(*id).unwrap();
170
171        Some((channel, path))
172    }
173
174    pub fn channel_at(&self, ix: usize) -> Option<&Arc<Channel>> {
175        self.channel_index.by_id().values().nth(ix)
176    }
177
178    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
179        &self.channel_invitations
180    }
181
182    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
183        self.channel_index.by_id().get(&channel_id)
184    }
185
186    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
187        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
188            if let OpenedModelHandle::Open(buffer) = buffer {
189                return buffer.upgrade(cx).is_some();
190            }
191        }
192        false
193    }
194
195    pub fn open_channel_buffer(
196        &mut self,
197        channel_id: ChannelId,
198        cx: &mut ModelContext<Self>,
199    ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
200        let client = self.client.clone();
201        self.open_channel_resource(
202            channel_id,
203            |this| &mut this.opened_buffers,
204            |channel, cx| ChannelBuffer::new(channel, client, cx),
205            cx,
206        )
207    }
208
209    pub fn open_channel_chat(
210        &mut self,
211        channel_id: ChannelId,
212        cx: &mut ModelContext<Self>,
213    ) -> Task<Result<ModelHandle<ChannelChat>>> {
214        let client = self.client.clone();
215        let user_store = self.user_store.clone();
216        self.open_channel_resource(
217            channel_id,
218            |this| &mut this.opened_chats,
219            |channel, cx| ChannelChat::new(channel, user_store, client, cx),
220            cx,
221        )
222    }
223
224    /// Asynchronously open a given resource associated with a channel.
225    ///
226    /// Make sure that the resource is only opened once, even if this method
227    /// is called multiple times with the same channel id while the first task
228    /// is still running.
229    fn open_channel_resource<T: Entity, F, Fut>(
230        &mut self,
231        channel_id: ChannelId,
232        get_map: fn(&mut Self) -> &mut HashMap<ChannelId, OpenedModelHandle<T>>,
233        load: F,
234        cx: &mut ModelContext<Self>,
235    ) -> Task<Result<ModelHandle<T>>>
236    where
237        F: 'static + FnOnce(Arc<Channel>, AsyncAppContext) -> Fut,
238        Fut: Future<Output = Result<ModelHandle<T>>>,
239    {
240        let task = loop {
241            match get_map(self).entry(channel_id) {
242                hash_map::Entry::Occupied(e) => match e.get() {
243                    OpenedModelHandle::Open(model) => {
244                        if let Some(model) = model.upgrade(cx) {
245                            break Task::ready(Ok(model)).shared();
246                        } else {
247                            get_map(self).remove(&channel_id);
248                            continue;
249                        }
250                    }
251                    OpenedModelHandle::Loading(task) => {
252                        break task.clone();
253                    }
254                },
255                hash_map::Entry::Vacant(e) => {
256                    let task = cx
257                        .spawn(|this, cx| async move {
258                            let channel = this.read_with(&cx, |this, _| {
259                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
260                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
261                                })
262                            })?;
263
264                            load(channel, cx).await.map_err(Arc::new)
265                        })
266                        .shared();
267
268                    e.insert(OpenedModelHandle::Loading(task.clone()));
269                    cx.spawn({
270                        let task = task.clone();
271                        |this, mut cx| async move {
272                            let result = task.await;
273                            this.update(&mut cx, |this, _| match result {
274                                Ok(model) => {
275                                    get_map(this).insert(
276                                        channel_id,
277                                        OpenedModelHandle::Open(model.downgrade()),
278                                    );
279                                }
280                                Err(_) => {
281                                    get_map(this).remove(&channel_id);
282                                }
283                            });
284                        }
285                    })
286                    .detach();
287                    break task;
288                }
289            }
290        };
291        cx.foreground()
292            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
293    }
294
295    pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
296        self.channel_index.iter().any(|path| {
297            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
298                path[..=ix]
299                    .iter()
300                    .any(|id| self.channels_with_admin_privileges.contains(id))
301            } else {
302                false
303            }
304        })
305    }
306
307    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
308        self.channel_participants
309            .get(&channel_id)
310            .map_or(&[], |v| v.as_slice())
311    }
312
313    pub fn create_channel(
314        &self,
315        name: &str,
316        parent_id: Option<ChannelId>,
317        cx: &mut ModelContext<Self>,
318    ) -> Task<Result<ChannelId>> {
319        let client = self.client.clone();
320        let name = name.trim_start_matches("#").to_owned();
321        cx.spawn(|this, mut cx| async move {
322            let response = client
323                .request(proto::CreateChannel { name, parent_id })
324                .await?;
325
326            let channel = response
327                .channel
328                .ok_or_else(|| anyhow!("missing channel in response"))?;
329            let channel_id = channel.id;
330
331            let parent_edge = if let Some(parent_id) = parent_id {
332                vec![ChannelEdge {
333                    channel_id: channel.id,
334                    parent_id,
335                }]
336            } else {
337                vec![]
338            };
339
340            this.update(&mut cx, |this, cx| {
341                let task = this.update_channels(
342                    proto::UpdateChannels {
343                        channels: vec![channel],
344                        insert_edge: parent_edge,
345                        channel_permissions: vec![ChannelPermission {
346                            channel_id,
347                            is_admin: true,
348                        }],
349                        ..Default::default()
350                    },
351                    cx,
352                );
353                assert!(task.is_none());
354
355                // This event is emitted because the collab panel wants to clear the pending edit state
356                // before this frame is rendered. But we can't guarantee that the collab panel's future
357                // will resolve before this flush_effects finishes. Synchronously emitting this event
358                // ensures that the collab panel will observe this creation before the frame completes
359                cx.emit(ChannelEvent::ChannelCreated(channel_id));
360            });
361
362            Ok(channel_id)
363        })
364    }
365
366    pub fn link_channel(
367        &mut self,
368        channel_id: ChannelId,
369        to: ChannelId,
370        cx: &mut ModelContext<Self>,
371    ) -> Task<Result<()>> {
372        let client = self.client.clone();
373        cx.spawn(|_, _| async move {
374            let _ = client
375                .request(proto::LinkChannel { channel_id, to })
376                .await?;
377
378            Ok(())
379        })
380    }
381
382    pub fn unlink_channel(
383        &mut self,
384        channel_id: ChannelId,
385        from: ChannelId,
386        cx: &mut ModelContext<Self>,
387    ) -> Task<Result<()>> {
388        let client = self.client.clone();
389        cx.spawn(|_, _| async move {
390            let _ = client
391                .request(proto::UnlinkChannel { channel_id, from })
392                .await?;
393
394            Ok(())
395        })
396    }
397
398    pub fn move_channel(
399        &mut self,
400        channel_id: ChannelId,
401        from: ChannelId,
402        to: ChannelId,
403        cx: &mut ModelContext<Self>,
404    ) -> Task<Result<()>> {
405        let client = self.client.clone();
406        cx.spawn(|_, _| async move {
407            let _ = client
408                .request(proto::MoveChannel {
409                    channel_id,
410                    from,
411                    to,
412                })
413                .await?;
414
415            Ok(())
416        })
417    }
418
419    pub fn invite_member(
420        &mut self,
421        channel_id: ChannelId,
422        user_id: UserId,
423        admin: bool,
424        cx: &mut ModelContext<Self>,
425    ) -> Task<Result<()>> {
426        if !self.outgoing_invites.insert((channel_id, user_id)) {
427            return Task::ready(Err(anyhow!("invite request already in progress")));
428        }
429
430        cx.notify();
431        let client = self.client.clone();
432        cx.spawn(|this, mut cx| async move {
433            let result = client
434                .request(proto::InviteChannelMember {
435                    channel_id,
436                    user_id,
437                    admin,
438                })
439                .await;
440
441            this.update(&mut cx, |this, cx| {
442                this.outgoing_invites.remove(&(channel_id, user_id));
443                cx.notify();
444            });
445
446            result?;
447
448            Ok(())
449        })
450    }
451
452    pub fn remove_member(
453        &mut self,
454        channel_id: ChannelId,
455        user_id: u64,
456        cx: &mut ModelContext<Self>,
457    ) -> Task<Result<()>> {
458        if !self.outgoing_invites.insert((channel_id, user_id)) {
459            return Task::ready(Err(anyhow!("invite request already in progress")));
460        }
461
462        cx.notify();
463        let client = self.client.clone();
464        cx.spawn(|this, mut cx| async move {
465            let result = client
466                .request(proto::RemoveChannelMember {
467                    channel_id,
468                    user_id,
469                })
470                .await;
471
472            this.update(&mut cx, |this, cx| {
473                this.outgoing_invites.remove(&(channel_id, user_id));
474                cx.notify();
475            });
476            result?;
477            Ok(())
478        })
479    }
480
481    pub fn set_member_admin(
482        &mut self,
483        channel_id: ChannelId,
484        user_id: UserId,
485        admin: bool,
486        cx: &mut ModelContext<Self>,
487    ) -> Task<Result<()>> {
488        if !self.outgoing_invites.insert((channel_id, user_id)) {
489            return Task::ready(Err(anyhow!("member request already in progress")));
490        }
491
492        cx.notify();
493        let client = self.client.clone();
494        cx.spawn(|this, mut cx| async move {
495            let result = client
496                .request(proto::SetChannelMemberAdmin {
497                    channel_id,
498                    user_id,
499                    admin,
500                })
501                .await;
502
503            this.update(&mut cx, |this, cx| {
504                this.outgoing_invites.remove(&(channel_id, user_id));
505                cx.notify();
506            });
507
508            result?;
509            Ok(())
510        })
511    }
512
513    pub fn rename(
514        &mut self,
515        channel_id: ChannelId,
516        new_name: &str,
517        cx: &mut ModelContext<Self>,
518    ) -> Task<Result<()>> {
519        let client = self.client.clone();
520        let name = new_name.to_string();
521        cx.spawn(|this, mut cx| async move {
522            let channel = client
523                .request(proto::RenameChannel { channel_id, name })
524                .await?
525                .channel
526                .ok_or_else(|| anyhow!("missing channel in response"))?;
527            this.update(&mut cx, |this, cx| {
528                let task = this.update_channels(
529                    proto::UpdateChannels {
530                        channels: vec![channel],
531                        ..Default::default()
532                    },
533                    cx,
534                );
535                assert!(task.is_none());
536
537                // This event is emitted because the collab panel wants to clear the pending edit state
538                // before this frame is rendered. But we can't guarantee that the collab panel's future
539                // will resolve before this flush_effects finishes. Synchronously emitting this event
540                // ensures that the collab panel will observe this creation before the frame complete
541                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
542            });
543            Ok(())
544        })
545    }
546
547    pub fn respond_to_channel_invite(
548        &mut self,
549        channel_id: ChannelId,
550        accept: bool,
551    ) -> impl Future<Output = Result<()>> {
552        let client = self.client.clone();
553        async move {
554            client
555                .request(proto::RespondToChannelInvite { channel_id, accept })
556                .await?;
557            Ok(())
558        }
559    }
560
561    pub fn get_channel_member_details(
562        &self,
563        channel_id: ChannelId,
564        cx: &mut ModelContext<Self>,
565    ) -> Task<Result<Vec<ChannelMembership>>> {
566        let client = self.client.clone();
567        let user_store = self.user_store.downgrade();
568        cx.spawn(|_, mut cx| async move {
569            let response = client
570                .request(proto::GetChannelMembers { channel_id })
571                .await?;
572
573            let user_ids = response.members.iter().map(|m| m.user_id).collect();
574            let user_store = user_store
575                .upgrade(&cx)
576                .ok_or_else(|| anyhow!("user store dropped"))?;
577            let users = user_store
578                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
579                .await?;
580
581            Ok(users
582                .into_iter()
583                .zip(response.members)
584                .filter_map(|(user, member)| {
585                    Some(ChannelMembership {
586                        user,
587                        admin: member.admin,
588                        kind: proto::channel_member::Kind::from_i32(member.kind)?,
589                    })
590                })
591                .collect())
592        })
593    }
594
595    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
596        let client = self.client.clone();
597        async move {
598            client.request(proto::DeleteChannel { channel_id }).await?;
599            Ok(())
600        }
601    }
602
603    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
604        false
605    }
606
607    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
608        self.outgoing_invites.contains(&(channel_id, user_id))
609    }
610
611    async fn handle_update_channels(
612        this: ModelHandle<Self>,
613        message: TypedEnvelope<proto::UpdateChannels>,
614        _: Arc<Client>,
615        mut cx: AsyncAppContext,
616    ) -> Result<()> {
617        this.update(&mut cx, |this, _| {
618            this.update_channels_tx
619                .unbounded_send(message.payload)
620                .unwrap();
621        });
622        Ok(())
623    }
624
625    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
626        self.disconnect_channel_buffers_task.take();
627
628        for chat in self.opened_chats.values() {
629            if let OpenedModelHandle::Open(chat) = chat {
630                if let Some(chat) = chat.upgrade(cx) {
631                    chat.update(cx, |chat, cx| {
632                        chat.rejoin(cx);
633                    });
634                }
635            }
636        }
637
638        let mut buffer_versions = Vec::new();
639        for buffer in self.opened_buffers.values() {
640            if let OpenedModelHandle::Open(buffer) = buffer {
641                if let Some(buffer) = buffer.upgrade(cx) {
642                    let channel_buffer = buffer.read(cx);
643                    let buffer = channel_buffer.buffer().read(cx);
644                    buffer_versions.push(proto::ChannelBufferVersion {
645                        channel_id: channel_buffer.channel().id,
646                        epoch: channel_buffer.epoch(),
647                        version: language::proto::serialize_version(&buffer.version()),
648                    });
649                }
650            }
651        }
652
653        if buffer_versions.is_empty() {
654            return Task::ready(Ok(()));
655        }
656
657        let response = self.client.request(proto::RejoinChannelBuffers {
658            buffers: buffer_versions,
659        });
660
661        cx.spawn(|this, mut cx| async move {
662            let mut response = response.await?;
663
664            this.update(&mut cx, |this, cx| {
665                this.opened_buffers.retain(|_, buffer| match buffer {
666                    OpenedModelHandle::Open(channel_buffer) => {
667                        let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
668                            return false;
669                        };
670
671                        channel_buffer.update(cx, |channel_buffer, cx| {
672                            let channel_id = channel_buffer.channel().id;
673                            if let Some(remote_buffer) = response
674                                .buffers
675                                .iter_mut()
676                                .find(|buffer| buffer.channel_id == channel_id)
677                            {
678                                let channel_id = channel_buffer.channel().id;
679                                let remote_version =
680                                    language::proto::deserialize_version(&remote_buffer.version);
681
682                                channel_buffer.replace_collaborators(
683                                    mem::take(&mut remote_buffer.collaborators),
684                                    cx,
685                                );
686
687                                let operations = channel_buffer
688                                    .buffer()
689                                    .update(cx, |buffer, cx| {
690                                        let outgoing_operations =
691                                            buffer.serialize_ops(Some(remote_version), cx);
692                                        let incoming_operations =
693                                            mem::take(&mut remote_buffer.operations)
694                                                .into_iter()
695                                                .map(language::proto::deserialize_operation)
696                                                .collect::<Result<Vec<_>>>()?;
697                                        buffer.apply_ops(incoming_operations, cx)?;
698                                        anyhow::Ok(outgoing_operations)
699                                    })
700                                    .log_err();
701
702                                if let Some(operations) = operations {
703                                    let client = this.client.clone();
704                                    cx.background()
705                                        .spawn(async move {
706                                            let operations = operations.await;
707                                            for chunk in
708                                                language::proto::split_operations(operations)
709                                            {
710                                                client
711                                                    .send(proto::UpdateChannelBuffer {
712                                                        channel_id,
713                                                        operations: chunk,
714                                                    })
715                                                    .ok();
716                                            }
717                                        })
718                                        .detach();
719                                    return true;
720                                }
721                            }
722
723                            channel_buffer.disconnect(cx);
724                            false
725                        })
726                    }
727                    OpenedModelHandle::Loading(_) => true,
728                });
729            });
730            anyhow::Ok(())
731        })
732    }
733
734    fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
735        self.channel_index.clear();
736        self.channel_invitations.clear();
737        self.channel_participants.clear();
738        self.channels_with_admin_privileges.clear();
739        self.channel_index.clear();
740        self.outgoing_invites.clear();
741        cx.notify();
742
743        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
744            cx.spawn_weak(|this, mut cx| async move {
745                cx.background().timer(RECONNECT_TIMEOUT).await;
746                if let Some(this) = this.upgrade(&cx) {
747                    this.update(&mut cx, |this, cx| {
748                        for (_, buffer) in this.opened_buffers.drain() {
749                            if let OpenedModelHandle::Open(buffer) = buffer {
750                                if let Some(buffer) = buffer.upgrade(cx) {
751                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
752                                }
753                            }
754                        }
755                    });
756                }
757            })
758        });
759    }
760
761    pub(crate) fn update_channels(
762        &mut self,
763        payload: proto::UpdateChannels,
764        cx: &mut ModelContext<ChannelStore>,
765    ) -> Option<Task<Result<()>>> {
766        if !payload.remove_channel_invitations.is_empty() {
767            self.channel_invitations
768                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
769        }
770        for channel in payload.channel_invitations {
771            match self
772                .channel_invitations
773                .binary_search_by_key(&channel.id, |c| c.id)
774            {
775                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
776                Err(ix) => self.channel_invitations.insert(
777                    ix,
778                    Arc::new(Channel {
779                        id: channel.id,
780                        name: channel.name,
781                    }),
782                ),
783            }
784        }
785
786        let channels_changed = !payload.channels.is_empty()
787            || !payload.delete_channels.is_empty()
788            || !payload.insert_edge.is_empty()
789            || !payload.delete_edge.is_empty();
790
791        if channels_changed {
792            if !payload.delete_channels.is_empty() {
793                self.channel_index.delete_channels(&payload.delete_channels);
794                self.channel_participants
795                    .retain(|channel_id, _| !payload.delete_channels.contains(channel_id));
796                self.channels_with_admin_privileges
797                    .retain(|channel_id| !payload.delete_channels.contains(channel_id));
798
799                for channel_id in &payload.delete_channels {
800                    let channel_id = *channel_id;
801                    if let Some(OpenedModelHandle::Open(buffer)) =
802                        self.opened_buffers.remove(&channel_id)
803                    {
804                        if let Some(buffer) = buffer.upgrade(cx) {
805                            buffer.update(cx, ChannelBuffer::disconnect);
806                        }
807                    }
808                }
809            }
810
811            let mut index = self.channel_index.bulk_insert();
812            for channel in payload.channels {
813                index.insert(channel)
814            }
815
816            for edge in payload.insert_edge {
817                index.insert_edge(edge.channel_id, edge.parent_id);
818            }
819
820            for edge in payload.delete_edge {
821                index.delete_edge(edge.parent_id, edge.channel_id);
822            }
823        }
824
825        for permission in payload.channel_permissions {
826            if permission.is_admin {
827                self.channels_with_admin_privileges
828                    .insert(permission.channel_id);
829            } else {
830                self.channels_with_admin_privileges
831                    .remove(&permission.channel_id);
832            }
833        }
834
835        cx.notify();
836        if payload.channel_participants.is_empty() {
837            return None;
838        }
839
840        let mut all_user_ids = Vec::new();
841        let channel_participants = payload.channel_participants;
842        for entry in &channel_participants {
843            for user_id in entry.participant_user_ids.iter() {
844                if let Err(ix) = all_user_ids.binary_search(user_id) {
845                    all_user_ids.insert(ix, *user_id);
846                }
847            }
848        }
849
850        let users = self
851            .user_store
852            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
853        Some(cx.spawn(|this, mut cx| async move {
854            let users = users.await?;
855
856            this.update(&mut cx, |this, cx| {
857                for entry in &channel_participants {
858                    let mut participants: Vec<_> = entry
859                        .participant_user_ids
860                        .iter()
861                        .filter_map(|user_id| {
862                            users
863                                .binary_search_by_key(&user_id, |user| &user.id)
864                                .ok()
865                                .map(|ix| users[ix].clone())
866                        })
867                        .collect();
868
869                    participants.sort_by_key(|u| u.id);
870
871                    this.channel_participants
872                        .insert(entry.channel_id, participants);
873                }
874
875                cx.notify();
876            });
877            anyhow::Ok(())
878        }))
879    }
880}
881
882impl Deref for ChannelPath {
883    type Target = [ChannelId];
884
885    fn deref(&self) -> &Self::Target {
886        &self.0
887    }
888}
889
890impl ChannelPath {
891    pub fn new(path: Arc<[ChannelId]>) -> Self {
892        debug_assert!(path.len() >= 1);
893        Self(path)
894    }
895
896    pub fn parent_id(&self) -> Option<ChannelId> {
897        self.0.len().checked_sub(2).map(|i| self.0[i])
898    }
899
900    pub fn channel_id(&self) -> ChannelId {
901        self.0[self.0.len() - 1]
902    }
903}
904
905impl From<ChannelPath> for Cow<'static, ChannelPath> {
906    fn from(value: ChannelPath) -> Self {
907        Cow::Owned(value)
908    }
909}
910
911impl<'a> From<&'a ChannelPath> for Cow<'a, ChannelPath> {
912    fn from(value: &'a ChannelPath) -> Self {
913        Cow::Borrowed(value)
914    }
915}
916
917impl Default for ChannelPath {
918    fn default() -> Self {
919        ChannelPath(Arc::from([]))
920    }
921}