channel_store.rs

  1use crate::channel_buffer::ChannelBuffer;
  2use anyhow::{anyhow, Result};
  3use client::{Client, Subscription, User, UserId, UserStore};
  4use collections::{hash_map, HashMap, HashSet};
  5use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt};
  6use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
  7use rpc::{proto, TypedEnvelope};
  8use std::{mem, sync::Arc, time::Duration};
  9use util::ResultExt;
 10
 11pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30);
 12
 13pub type ChannelId = u64;
 14
 15pub struct ChannelStore {
 16    channels_by_id: HashMap<ChannelId, Arc<Channel>>,
 17    channel_paths: Vec<Vec<ChannelId>>,
 18    channel_invitations: Vec<Arc<Channel>>,
 19    channel_participants: HashMap<ChannelId, Vec<Arc<User>>>,
 20    channels_with_admin_privileges: HashSet<ChannelId>,
 21    outgoing_invites: HashSet<(ChannelId, UserId)>,
 22    update_channels_tx: mpsc::UnboundedSender<proto::UpdateChannels>,
 23    opened_buffers: HashMap<ChannelId, OpenedChannelBuffer>,
 24    client: Arc<Client>,
 25    user_store: ModelHandle<UserStore>,
 26    _rpc_subscription: Subscription,
 27    _watch_connection_status: Task<Option<()>>,
 28    disconnect_channel_buffers_task: Option<Task<()>>,
 29    _update_channels: Task<()>,
 30}
 31
 32#[derive(Clone, Debug, PartialEq)]
 33pub struct Channel {
 34    pub id: ChannelId,
 35    pub name: String,
 36}
 37
 38pub struct ChannelMembership {
 39    pub user: Arc<User>,
 40    pub kind: proto::channel_member::Kind,
 41    pub admin: bool,
 42}
 43
 44pub enum ChannelEvent {
 45    ChannelCreated(ChannelId),
 46    ChannelRenamed(ChannelId),
 47}
 48
 49impl Entity for ChannelStore {
 50    type Event = ChannelEvent;
 51}
 52
 53pub enum ChannelMemberStatus {
 54    Invited,
 55    Member,
 56    NotMember,
 57}
 58
 59enum OpenedChannelBuffer {
 60    Open(WeakModelHandle<ChannelBuffer>),
 61    Loading(Shared<Task<Result<ModelHandle<ChannelBuffer>, Arc<anyhow::Error>>>>),
 62}
 63
 64impl ChannelStore {
 65    pub fn new(
 66        client: Arc<Client>,
 67        user_store: ModelHandle<UserStore>,
 68        cx: &mut ModelContext<Self>,
 69    ) -> Self {
 70        let rpc_subscription =
 71            client.add_message_handler(cx.handle(), Self::handle_update_channels);
 72
 73        let mut connection_status = client.status();
 74        let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded();
 75        let watch_connection_status = cx.spawn_weak(|this, mut cx| async move {
 76            while let Some(status) = connection_status.next().await {
 77                let this = this.upgrade(&cx)?;
 78                if status.is_connected() {
 79                    this.update(&mut cx, |this, cx| this.handle_connect(cx))
 80                        .await
 81                        .log_err()?;
 82                } else {
 83                    this.update(&mut cx, |this, cx| this.handle_disconnect(cx));
 84                }
 85            }
 86            Some(())
 87        });
 88
 89        Self {
 90            channels_by_id: HashMap::default(),
 91            channel_invitations: Vec::default(),
 92            channel_paths: Vec::default(),
 93            channel_participants: Default::default(),
 94            channels_with_admin_privileges: Default::default(),
 95            outgoing_invites: Default::default(),
 96            opened_buffers: Default::default(),
 97            update_channels_tx,
 98            client,
 99            user_store,
100            _rpc_subscription: rpc_subscription,
101            _watch_connection_status: watch_connection_status,
102            disconnect_channel_buffers_task: None,
103            _update_channels: cx.spawn_weak(|this, mut cx| async move {
104                while let Some(update_channels) = update_channels_rx.next().await {
105                    if let Some(this) = this.upgrade(&cx) {
106                        let update_task = this.update(&mut cx, |this, cx| {
107                            this.update_channels(update_channels, cx)
108                        });
109                        if let Some(update_task) = update_task {
110                            update_task.await.log_err();
111                        }
112                    }
113                }
114            }),
115        }
116    }
117
118    pub fn has_children(&self, channel_id: ChannelId) -> bool {
119        self.channel_paths.iter().any(|path| {
120            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
121                path.len() > ix + 1
122            } else {
123                false
124            }
125        })
126    }
127
128    pub fn channel_count(&self) -> usize {
129        self.channel_paths.len()
130    }
131
132    pub fn channels(&self) -> impl '_ + Iterator<Item = (usize, &Arc<Channel>)> {
133        self.channel_paths.iter().map(move |path| {
134            let id = path.last().unwrap();
135            let channel = self.channel_for_id(*id).unwrap();
136            (path.len() - 1, channel)
137        })
138    }
139
140    pub fn channel_at_index(&self, ix: usize) -> Option<(usize, &Arc<Channel>)> {
141        let path = self.channel_paths.get(ix)?;
142        let id = path.last().unwrap();
143        let channel = self.channel_for_id(*id).unwrap();
144        Some((path.len() - 1, channel))
145    }
146
147    pub fn channel_invitations(&self) -> &[Arc<Channel>] {
148        &self.channel_invitations
149    }
150
151    pub fn channel_for_id(&self, channel_id: ChannelId) -> Option<&Arc<Channel>> {
152        self.channels_by_id.get(&channel_id)
153    }
154
155    pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool {
156        if let Some(buffer) = self.opened_buffers.get(&channel_id) {
157            if let OpenedChannelBuffer::Open(buffer) = buffer {
158                return buffer.upgrade(cx).is_some();
159            }
160        }
161        false
162    }
163
164    pub fn open_channel_buffer(
165        &mut self,
166        channel_id: ChannelId,
167        cx: &mut ModelContext<Self>,
168    ) -> Task<Result<ModelHandle<ChannelBuffer>>> {
169        // Make sure that a given channel buffer is only opened once per
170        // app instance, even if this method is called multiple times
171        // with the same channel id while the first task is still running.
172        let task = loop {
173            match self.opened_buffers.entry(channel_id) {
174                hash_map::Entry::Occupied(e) => match e.get() {
175                    OpenedChannelBuffer::Open(buffer) => {
176                        if let Some(buffer) = buffer.upgrade(cx) {
177                            break Task::ready(Ok(buffer)).shared();
178                        } else {
179                            self.opened_buffers.remove(&channel_id);
180                            continue;
181                        }
182                    }
183                    OpenedChannelBuffer::Loading(task) => break task.clone(),
184                },
185                hash_map::Entry::Vacant(e) => {
186                    let client = self.client.clone();
187                    let task = cx
188                        .spawn(|this, cx| async move {
189                            let channel = this.read_with(&cx, |this, _| {
190                                this.channel_for_id(channel_id).cloned().ok_or_else(|| {
191                                    Arc::new(anyhow!("no channel for id: {}", channel_id))
192                                })
193                            })?;
194
195                            ChannelBuffer::new(channel, client, cx)
196                                .await
197                                .map_err(Arc::new)
198                        })
199                        .shared();
200                    e.insert(OpenedChannelBuffer::Loading(task.clone()));
201                    cx.spawn({
202                        let task = task.clone();
203                        |this, mut cx| async move {
204                            let result = task.await;
205                            this.update(&mut cx, |this, cx| match result {
206                                Ok(buffer) => {
207                                    cx.observe_release(&buffer, move |this, _, _| {
208                                        this.opened_buffers.remove(&channel_id);
209                                    })
210                                    .detach();
211                                    this.opened_buffers.insert(
212                                        channel_id,
213                                        OpenedChannelBuffer::Open(buffer.downgrade()),
214                                    );
215                                }
216                                Err(error) => {
217                                    log::error!("failed to open channel buffer {error:?}");
218                                    this.opened_buffers.remove(&channel_id);
219                                }
220                            });
221                        }
222                    })
223                    .detach();
224                    break task;
225                }
226            }
227        };
228        cx.foreground()
229            .spawn(async move { task.await.map_err(|error| anyhow!("{}", error)) })
230    }
231
232    pub fn is_user_admin(&self, channel_id: ChannelId) -> bool {
233        self.channel_paths.iter().any(|path| {
234            if let Some(ix) = path.iter().position(|id| *id == channel_id) {
235                path[..=ix]
236                    .iter()
237                    .any(|id| self.channels_with_admin_privileges.contains(id))
238            } else {
239                false
240            }
241        })
242    }
243
244    pub fn channel_participants(&self, channel_id: ChannelId) -> &[Arc<User>] {
245        self.channel_participants
246            .get(&channel_id)
247            .map_or(&[], |v| v.as_slice())
248    }
249
250    pub fn create_channel(
251        &self,
252        name: &str,
253        parent_id: Option<ChannelId>,
254        cx: &mut ModelContext<Self>,
255    ) -> Task<Result<ChannelId>> {
256        let client = self.client.clone();
257        let name = name.trim_start_matches("#").to_owned();
258        cx.spawn(|this, mut cx| async move {
259            let channel = client
260                .request(proto::CreateChannel { name, parent_id })
261                .await?
262                .channel
263                .ok_or_else(|| anyhow!("missing channel in response"))?;
264
265            let channel_id = channel.id;
266
267            this.update(&mut cx, |this, cx| {
268                let task = this.update_channels(
269                    proto::UpdateChannels {
270                        channels: vec![channel],
271                        ..Default::default()
272                    },
273                    cx,
274                );
275                assert!(task.is_none());
276
277                // This event is emitted because the collab panel wants to clear the pending edit state
278                // before this frame is rendered. But we can't guarantee that the collab panel's future
279                // will resolve before this flush_effects finishes. Synchronously emitting this event
280                // ensures that the collab panel will observe this creation before the frame completes
281                cx.emit(ChannelEvent::ChannelCreated(channel_id));
282            });
283
284            Ok(channel_id)
285        })
286    }
287
288    pub fn invite_member(
289        &mut self,
290        channel_id: ChannelId,
291        user_id: UserId,
292        admin: bool,
293        cx: &mut ModelContext<Self>,
294    ) -> Task<Result<()>> {
295        if !self.outgoing_invites.insert((channel_id, user_id)) {
296            return Task::ready(Err(anyhow!("invite request already in progress")));
297        }
298
299        cx.notify();
300        let client = self.client.clone();
301        cx.spawn(|this, mut cx| async move {
302            let result = client
303                .request(proto::InviteChannelMember {
304                    channel_id,
305                    user_id,
306                    admin,
307                })
308                .await;
309
310            this.update(&mut cx, |this, cx| {
311                this.outgoing_invites.remove(&(channel_id, user_id));
312                cx.notify();
313            });
314
315            result?;
316
317            Ok(())
318        })
319    }
320
321    pub fn remove_member(
322        &mut self,
323        channel_id: ChannelId,
324        user_id: u64,
325        cx: &mut ModelContext<Self>,
326    ) -> Task<Result<()>> {
327        if !self.outgoing_invites.insert((channel_id, user_id)) {
328            return Task::ready(Err(anyhow!("invite request already in progress")));
329        }
330
331        cx.notify();
332        let client = self.client.clone();
333        cx.spawn(|this, mut cx| async move {
334            let result = client
335                .request(proto::RemoveChannelMember {
336                    channel_id,
337                    user_id,
338                })
339                .await;
340
341            this.update(&mut cx, |this, cx| {
342                this.outgoing_invites.remove(&(channel_id, user_id));
343                cx.notify();
344            });
345            result?;
346            Ok(())
347        })
348    }
349
350    pub fn set_member_admin(
351        &mut self,
352        channel_id: ChannelId,
353        user_id: UserId,
354        admin: bool,
355        cx: &mut ModelContext<Self>,
356    ) -> Task<Result<()>> {
357        if !self.outgoing_invites.insert((channel_id, user_id)) {
358            return Task::ready(Err(anyhow!("member request already in progress")));
359        }
360
361        cx.notify();
362        let client = self.client.clone();
363        cx.spawn(|this, mut cx| async move {
364            let result = client
365                .request(proto::SetChannelMemberAdmin {
366                    channel_id,
367                    user_id,
368                    admin,
369                })
370                .await;
371
372            this.update(&mut cx, |this, cx| {
373                this.outgoing_invites.remove(&(channel_id, user_id));
374                cx.notify();
375            });
376
377            result?;
378            Ok(())
379        })
380    }
381
382    pub fn rename(
383        &mut self,
384        channel_id: ChannelId,
385        new_name: &str,
386        cx: &mut ModelContext<Self>,
387    ) -> Task<Result<()>> {
388        let client = self.client.clone();
389        let name = new_name.to_string();
390        cx.spawn(|this, mut cx| async move {
391            let channel = client
392                .request(proto::RenameChannel { channel_id, name })
393                .await?
394                .channel
395                .ok_or_else(|| anyhow!("missing channel in response"))?;
396            this.update(&mut cx, |this, cx| {
397                let task = this.update_channels(
398                    proto::UpdateChannels {
399                        channels: vec![channel],
400                        ..Default::default()
401                    },
402                    cx,
403                );
404                assert!(task.is_none());
405
406                // This event is emitted because the collab panel wants to clear the pending edit state
407                // before this frame is rendered. But we can't guarantee that the collab panel's future
408                // will resolve before this flush_effects finishes. Synchronously emitting this event
409                // ensures that the collab panel will observe this creation before the frame complete
410                cx.emit(ChannelEvent::ChannelRenamed(channel_id))
411            });
412            Ok(())
413        })
414    }
415
416    pub fn respond_to_channel_invite(
417        &mut self,
418        channel_id: ChannelId,
419        accept: bool,
420    ) -> impl Future<Output = Result<()>> {
421        let client = self.client.clone();
422        async move {
423            client
424                .request(proto::RespondToChannelInvite { channel_id, accept })
425                .await?;
426            Ok(())
427        }
428    }
429
430    pub fn get_channel_member_details(
431        &self,
432        channel_id: ChannelId,
433        cx: &mut ModelContext<Self>,
434    ) -> Task<Result<Vec<ChannelMembership>>> {
435        let client = self.client.clone();
436        let user_store = self.user_store.downgrade();
437        cx.spawn(|_, mut cx| async move {
438            let response = client
439                .request(proto::GetChannelMembers { channel_id })
440                .await?;
441
442            let user_ids = response.members.iter().map(|m| m.user_id).collect();
443            let user_store = user_store
444                .upgrade(&cx)
445                .ok_or_else(|| anyhow!("user store dropped"))?;
446            let users = user_store
447                .update(&mut cx, |user_store, cx| user_store.get_users(user_ids, cx))
448                .await?;
449
450            Ok(users
451                .into_iter()
452                .zip(response.members)
453                .filter_map(|(user, member)| {
454                    Some(ChannelMembership {
455                        user,
456                        admin: member.admin,
457                        kind: proto::channel_member::Kind::from_i32(member.kind)?,
458                    })
459                })
460                .collect())
461        })
462    }
463
464    pub fn remove_channel(&self, channel_id: ChannelId) -> impl Future<Output = Result<()>> {
465        let client = self.client.clone();
466        async move {
467            client.request(proto::RemoveChannel { channel_id }).await?;
468            Ok(())
469        }
470    }
471
472    pub fn has_pending_channel_invite_response(&self, _: &Arc<Channel>) -> bool {
473        false
474    }
475
476    pub fn has_pending_channel_invite(&self, channel_id: ChannelId, user_id: UserId) -> bool {
477        self.outgoing_invites.contains(&(channel_id, user_id))
478    }
479
480    async fn handle_update_channels(
481        this: ModelHandle<Self>,
482        message: TypedEnvelope<proto::UpdateChannels>,
483        _: Arc<Client>,
484        mut cx: AsyncAppContext,
485    ) -> Result<()> {
486        this.update(&mut cx, |this, _| {
487            this.update_channels_tx
488                .unbounded_send(message.payload)
489                .unwrap();
490        });
491        Ok(())
492    }
493
494    fn handle_connect(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
495        self.disconnect_channel_buffers_task.take();
496
497        let mut buffer_versions = Vec::new();
498        for buffer in self.opened_buffers.values() {
499            if let OpenedChannelBuffer::Open(buffer) = buffer {
500                if let Some(buffer) = buffer.upgrade(cx) {
501                    let channel_buffer = buffer.read(cx);
502                    let buffer = channel_buffer.buffer().read(cx);
503                    buffer_versions.push(proto::ChannelBufferVersion {
504                        channel_id: channel_buffer.channel().id,
505                        epoch: channel_buffer.epoch(),
506                        version: language::proto::serialize_version(&buffer.version()),
507                    });
508                }
509            }
510        }
511
512        if buffer_versions.is_empty() {
513            return Task::ready(Ok(()));
514        }
515
516        let response = self.client.request(proto::RejoinChannelBuffers {
517            buffers: buffer_versions,
518        });
519
520        cx.spawn(|this, mut cx| async move {
521            let mut response = response.await?;
522
523            this.update(&mut cx, |this, cx| {
524                this.opened_buffers.retain(|_, buffer| match buffer {
525                    OpenedChannelBuffer::Open(channel_buffer) => {
526                        let Some(channel_buffer) = channel_buffer.upgrade(cx) else {
527                            return false;
528                        };
529
530                        channel_buffer.update(cx, |channel_buffer, cx| {
531                            let channel_id = channel_buffer.channel().id;
532                            if let Some(remote_buffer) = response
533                                .buffers
534                                .iter_mut()
535                                .find(|buffer| buffer.channel_id == channel_id)
536                            {
537                                let channel_id = channel_buffer.channel().id;
538                                let remote_version =
539                                    language::proto::deserialize_version(&remote_buffer.version);
540
541                                channel_buffer.replace_collaborators(
542                                    mem::take(&mut remote_buffer.collaborators),
543                                    cx,
544                                );
545
546                                let operations = channel_buffer
547                                    .buffer()
548                                    .update(cx, |buffer, cx| {
549                                        let outgoing_operations =
550                                            buffer.serialize_ops(Some(remote_version), cx);
551                                        let incoming_operations =
552                                            mem::take(&mut remote_buffer.operations)
553                                                .into_iter()
554                                                .map(language::proto::deserialize_operation)
555                                                .collect::<Result<Vec<_>>>()?;
556                                        buffer.apply_ops(incoming_operations, cx)?;
557                                        anyhow::Ok(outgoing_operations)
558                                    })
559                                    .log_err();
560
561                                if let Some(operations) = operations {
562                                    let client = this.client.clone();
563                                    cx.background()
564                                        .spawn(async move {
565                                            let operations = operations.await;
566                                            for chunk in
567                                                language::proto::split_operations(operations)
568                                            {
569                                                client
570                                                    .send(proto::UpdateChannelBuffer {
571                                                        channel_id,
572                                                        operations: chunk,
573                                                    })
574                                                    .ok();
575                                            }
576                                        })
577                                        .detach();
578                                    return true;
579                                }
580                            }
581
582                            channel_buffer.disconnect(cx);
583                            false
584                        })
585                    }
586                    OpenedChannelBuffer::Loading(_) => true,
587                });
588            });
589            anyhow::Ok(())
590        })
591    }
592
593    fn handle_disconnect(&mut self, cx: &mut ModelContext<Self>) {
594        self.channels_by_id.clear();
595        self.channel_invitations.clear();
596        self.channel_participants.clear();
597        self.channels_with_admin_privileges.clear();
598        self.channel_paths.clear();
599        self.outgoing_invites.clear();
600        cx.notify();
601
602        self.disconnect_channel_buffers_task.get_or_insert_with(|| {
603            cx.spawn_weak(|this, mut cx| async move {
604                cx.background().timer(RECONNECT_TIMEOUT).await;
605                if let Some(this) = this.upgrade(&cx) {
606                    this.update(&mut cx, |this, cx| {
607                        for (_, buffer) in this.opened_buffers.drain() {
608                            if let OpenedChannelBuffer::Open(buffer) = buffer {
609                                if let Some(buffer) = buffer.upgrade(cx) {
610                                    buffer.update(cx, |buffer, cx| buffer.disconnect(cx));
611                                }
612                            }
613                        }
614                    });
615                }
616            })
617        });
618    }
619
620    pub(crate) fn update_channels(
621        &mut self,
622        payload: proto::UpdateChannels,
623        cx: &mut ModelContext<ChannelStore>,
624    ) -> Option<Task<Result<()>>> {
625        if !payload.remove_channel_invitations.is_empty() {
626            self.channel_invitations
627                .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id));
628        }
629        for channel in payload.channel_invitations {
630            match self
631                .channel_invitations
632                .binary_search_by_key(&channel.id, |c| c.id)
633            {
634                Ok(ix) => Arc::make_mut(&mut self.channel_invitations[ix]).name = channel.name,
635                Err(ix) => self.channel_invitations.insert(
636                    ix,
637                    Arc::new(Channel {
638                        id: channel.id,
639                        name: channel.name,
640                    }),
641                ),
642            }
643        }
644
645        let channels_changed = !payload.channels.is_empty() || !payload.remove_channels.is_empty();
646        if channels_changed {
647            if !payload.remove_channels.is_empty() {
648                self.channels_by_id
649                    .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
650                self.channel_participants
651                    .retain(|channel_id, _| !payload.remove_channels.contains(channel_id));
652                self.channels_with_admin_privileges
653                    .retain(|channel_id| !payload.remove_channels.contains(channel_id));
654
655                for channel_id in &payload.remove_channels {
656                    let channel_id = *channel_id;
657                    if let Some(OpenedChannelBuffer::Open(buffer)) =
658                        self.opened_buffers.remove(&channel_id)
659                    {
660                        if let Some(buffer) = buffer.upgrade(cx) {
661                            buffer.update(cx, ChannelBuffer::disconnect);
662                        }
663                    }
664                }
665            }
666
667            for channel_proto in payload.channels {
668                if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) {
669                    Arc::make_mut(existing_channel).name = channel_proto.name;
670                } else {
671                    let channel = Arc::new(Channel {
672                        id: channel_proto.id,
673                        name: channel_proto.name,
674                    });
675                    self.channels_by_id.insert(channel.id, channel.clone());
676
677                    if let Some(parent_id) = channel_proto.parent_id {
678                        let mut ix = 0;
679                        while ix < self.channel_paths.len() {
680                            let path = &self.channel_paths[ix];
681                            if path.ends_with(&[parent_id]) {
682                                let mut new_path = path.clone();
683                                new_path.push(channel.id);
684                                self.channel_paths.insert(ix + 1, new_path);
685                                ix += 1;
686                            }
687                            ix += 1;
688                        }
689                    } else {
690                        self.channel_paths.push(vec![channel.id]);
691                    }
692                }
693            }
694
695            self.channel_paths.sort_by(|a, b| {
696                let a = Self::channel_path_sorting_key(a, &self.channels_by_id);
697                let b = Self::channel_path_sorting_key(b, &self.channels_by_id);
698                a.cmp(b)
699            });
700            self.channel_paths.dedup();
701            self.channel_paths.retain(|path| {
702                path.iter()
703                    .all(|channel_id| self.channels_by_id.contains_key(channel_id))
704            });
705        }
706
707        for permission in payload.channel_permissions {
708            if permission.is_admin {
709                self.channels_with_admin_privileges
710                    .insert(permission.channel_id);
711            } else {
712                self.channels_with_admin_privileges
713                    .remove(&permission.channel_id);
714            }
715        }
716
717        cx.notify();
718        if payload.channel_participants.is_empty() {
719            return None;
720        }
721
722        let mut all_user_ids = Vec::new();
723        let channel_participants = payload.channel_participants;
724        for entry in &channel_participants {
725            for user_id in entry.participant_user_ids.iter() {
726                if let Err(ix) = all_user_ids.binary_search(user_id) {
727                    all_user_ids.insert(ix, *user_id);
728                }
729            }
730        }
731
732        let users = self
733            .user_store
734            .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx));
735        Some(cx.spawn(|this, mut cx| async move {
736            let users = users.await?;
737
738            this.update(&mut cx, |this, cx| {
739                for entry in &channel_participants {
740                    let mut participants: Vec<_> = entry
741                        .participant_user_ids
742                        .iter()
743                        .filter_map(|user_id| {
744                            users
745                                .binary_search_by_key(&user_id, |user| &user.id)
746                                .ok()
747                                .map(|ix| users[ix].clone())
748                        })
749                        .collect();
750
751                    participants.sort_by_key(|u| u.id);
752
753                    this.channel_participants
754                        .insert(entry.channel_id, participants);
755                }
756
757                cx.notify();
758            });
759            anyhow::Ok(())
760        }))
761    }
762
763    fn channel_path_sorting_key<'a>(
764        path: &'a [ChannelId],
765        channels_by_id: &'a HashMap<ChannelId, Arc<Channel>>,
766    ) -> impl 'a + Iterator<Item = Option<&'a str>> {
767        path.iter()
768            .map(|id| Some(channels_by_id.get(id)?.name.as_str()))
769    }
770}