user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use chrono::{DateTime, Utc};
  4use collections::{hash_map::Entry, HashMap, HashSet};
  5use feature_flags::FeatureFlagAppExt;
  6use futures::{channel::mpsc, Future, StreamExt};
  7use gpui::{
  8    AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
  9    WeakModel,
 10};
 11use postage::{sink::Sink, watch};
 12use rpc::proto::{RequestMessage, UsersResponse};
 13use std::sync::{Arc, Weak};
 14use text::ReplicaId;
 15use util::TryFutureExt as _;
 16
 17pub type UserId = u64;
 18
 19#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 20pub struct ChannelId(pub u64);
 21
 22impl std::fmt::Display for ChannelId {
 23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 24        self.0.fmt(f)
 25    }
 26}
 27
 28#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 29pub struct ProjectId(pub u64);
 30
 31#[derive(
 32    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 33)]
 34pub struct DevServerProjectId(pub u64);
 35
 36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 37pub struct ParticipantIndex(pub u32);
 38
 39#[derive(Default, Debug)]
 40pub struct User {
 41    pub id: UserId,
 42    pub github_login: String,
 43    pub avatar_uri: SharedUri,
 44}
 45
 46#[derive(Clone, Debug, PartialEq, Eq)]
 47pub struct Collaborator {
 48    pub peer_id: proto::PeerId,
 49    pub replica_id: ReplicaId,
 50    pub user_id: UserId,
 51    pub is_host: bool,
 52}
 53
 54impl PartialOrd for User {
 55    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 56        Some(self.cmp(other))
 57    }
 58}
 59
 60impl Ord for User {
 61    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 62        self.github_login.cmp(&other.github_login)
 63    }
 64}
 65
 66impl PartialEq for User {
 67    fn eq(&self, other: &Self) -> bool {
 68        self.id == other.id && self.github_login == other.github_login
 69    }
 70}
 71
 72impl Eq for User {}
 73
 74#[derive(Debug, PartialEq)]
 75pub struct Contact {
 76    pub user: Arc<User>,
 77    pub online: bool,
 78    pub busy: bool,
 79}
 80
 81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 82pub enum ContactRequestStatus {
 83    None,
 84    RequestSent,
 85    RequestReceived,
 86    RequestAccepted,
 87}
 88
 89pub struct UserStore {
 90    users: HashMap<u64, Arc<User>>,
 91    by_github_login: HashMap<String, u64>,
 92    participant_indices: HashMap<u64, ParticipantIndex>,
 93    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 94    current_plan: Option<proto::Plan>,
 95    current_user: watch::Receiver<Option<Arc<User>>>,
 96    accepted_tos_at: Option<Option<DateTime<Utc>>>,
 97    contacts: Vec<Arc<Contact>>,
 98    incoming_contact_requests: Vec<Arc<User>>,
 99    outgoing_contact_requests: Vec<Arc<User>>,
100    pending_contact_requests: HashMap<u64, usize>,
101    invite_info: Option<InviteInfo>,
102    client: Weak<Client>,
103    _maintain_contacts: Task<()>,
104    _maintain_current_user: Task<Result<()>>,
105    weak_self: WeakModel<Self>,
106}
107
108#[derive(Clone)]
109pub struct InviteInfo {
110    pub count: u32,
111    pub url: Arc<str>,
112}
113
114pub enum Event {
115    Contact {
116        user: Arc<User>,
117        kind: ContactEventKind,
118    },
119    ShowContacts,
120    ParticipantIndicesChanged,
121}
122
123#[derive(Clone, Copy)]
124pub enum ContactEventKind {
125    Requested,
126    Accepted,
127    Cancelled,
128}
129
130impl EventEmitter<Event> for UserStore {}
131
132enum UpdateContacts {
133    Update(proto::UpdateContacts),
134    Wait(postage::barrier::Sender),
135    Clear(postage::barrier::Sender),
136}
137
138impl UserStore {
139    pub fn new(client: Arc<Client>, cx: &ModelContext<Self>) -> Self {
140        let (mut current_user_tx, current_user_rx) = watch::channel();
141        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
142        let rpc_subscriptions = vec![
143            client.add_message_handler(cx.weak_model(), Self::handle_update_plan),
144            client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
145            client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
146            client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
147        ];
148        Self {
149            users: Default::default(),
150            by_github_login: Default::default(),
151            current_user: current_user_rx,
152            current_plan: None,
153            accepted_tos_at: None,
154            contacts: Default::default(),
155            incoming_contact_requests: Default::default(),
156            participant_indices: Default::default(),
157            outgoing_contact_requests: Default::default(),
158            invite_info: None,
159            client: Arc::downgrade(&client),
160            update_contacts_tx,
161            _maintain_contacts: cx.spawn(|this, mut cx| async move {
162                let _subscriptions = rpc_subscriptions;
163                while let Some(message) = update_contacts_rx.next().await {
164                    if let Ok(task) =
165                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
166                    {
167                        task.log_err().await;
168                    } else {
169                        break;
170                    }
171                }
172            }),
173            _maintain_current_user: cx.spawn(|this, mut cx| async move {
174                let mut status = client.status();
175                let weak = Arc::downgrade(&client);
176                drop(client);
177                while let Some(status) = status.next().await {
178                    // if the client is dropped, the app is shutting down.
179                    let Some(client) = weak.upgrade() else {
180                        return Ok(());
181                    };
182                    match status {
183                        Status::Connected { .. } => {
184                            if let Some(user_id) = client.user_id() {
185                                let fetch_user = if let Ok(fetch_user) = this
186                                    .update(&mut cx, |this, cx| {
187                                        this.get_user(user_id, cx).log_err()
188                                    }) {
189                                    fetch_user
190                                } else {
191                                    break;
192                                };
193                                let fetch_private_user_info =
194                                    client.request(proto::GetPrivateUserInfo {}).log_err();
195                                let (user, info) =
196                                    futures::join!(fetch_user, fetch_private_user_info);
197
198                                cx.update(|cx| {
199                                    if let Some(info) = info {
200                                        let disable_staff = std::env::var("ZED_DISABLE_STAFF")
201                                            .map_or(false, |v| !v.is_empty() && v != "0");
202                                        let staff = info.staff && !disable_staff;
203                                        cx.update_flags(staff, info.flags);
204                                        client.telemetry.set_authenticated_user_info(
205                                            Some(info.metrics_id.clone()),
206                                            staff,
207                                        );
208
209                                        this.update(cx, |this, _| {
210                                            this.set_current_user_accepted_tos_at(
211                                                info.accepted_tos_at,
212                                            );
213                                        })
214                                    } else {
215                                        anyhow::Ok(())
216                                    }
217                                })??;
218
219                                current_user_tx.send(user).await.ok();
220
221                                this.update(&mut cx, |_, cx| cx.notify())?;
222                            }
223                        }
224                        Status::SignedOut => {
225                            current_user_tx.send(None).await.ok();
226                            this.update(&mut cx, |this, cx| {
227                                cx.notify();
228                                this.clear_contacts()
229                            })?
230                            .await;
231                        }
232                        Status::ConnectionLost => {
233                            this.update(&mut cx, |this, cx| {
234                                cx.notify();
235                                this.clear_contacts()
236                            })?
237                            .await;
238                        }
239                        _ => {}
240                    }
241                }
242                Ok(())
243            }),
244            pending_contact_requests: Default::default(),
245            weak_self: cx.weak_model(),
246        }
247    }
248
249    #[cfg(feature = "test-support")]
250    pub fn clear_cache(&mut self) {
251        self.users.clear();
252        self.by_github_login.clear();
253    }
254
255    async fn handle_update_invite_info(
256        this: Model<Self>,
257        message: TypedEnvelope<proto::UpdateInviteInfo>,
258        mut cx: AsyncAppContext,
259    ) -> Result<()> {
260        this.update(&mut cx, |this, cx| {
261            this.invite_info = Some(InviteInfo {
262                url: Arc::from(message.payload.url),
263                count: message.payload.count,
264            });
265            cx.notify();
266        })?;
267        Ok(())
268    }
269
270    async fn handle_show_contacts(
271        this: Model<Self>,
272        _: TypedEnvelope<proto::ShowContacts>,
273        mut cx: AsyncAppContext,
274    ) -> Result<()> {
275        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
276        Ok(())
277    }
278
279    pub fn invite_info(&self) -> Option<&InviteInfo> {
280        self.invite_info.as_ref()
281    }
282
283    async fn handle_update_contacts(
284        this: Model<Self>,
285        message: TypedEnvelope<proto::UpdateContacts>,
286        mut cx: AsyncAppContext,
287    ) -> Result<()> {
288        this.update(&mut cx, |this, _| {
289            this.update_contacts_tx
290                .unbounded_send(UpdateContacts::Update(message.payload))
291                .unwrap();
292        })?;
293        Ok(())
294    }
295
296    async fn handle_update_plan(
297        this: Model<Self>,
298        message: TypedEnvelope<proto::UpdateUserPlan>,
299        mut cx: AsyncAppContext,
300    ) -> Result<()> {
301        this.update(&mut cx, |this, cx| {
302            this.current_plan = Some(message.payload.plan());
303            cx.notify();
304        })?;
305        Ok(())
306    }
307
308    fn update_contacts(
309        &mut self,
310        message: UpdateContacts,
311        cx: &ModelContext<Self>,
312    ) -> Task<Result<()>> {
313        match message {
314            UpdateContacts::Wait(barrier) => {
315                drop(barrier);
316                Task::ready(Ok(()))
317            }
318            UpdateContacts::Clear(barrier) => {
319                self.contacts.clear();
320                self.incoming_contact_requests.clear();
321                self.outgoing_contact_requests.clear();
322                drop(barrier);
323                Task::ready(Ok(()))
324            }
325            UpdateContacts::Update(message) => {
326                let mut user_ids = HashSet::default();
327                for contact in &message.contacts {
328                    user_ids.insert(contact.user_id);
329                }
330                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
331                user_ids.extend(message.outgoing_requests.iter());
332
333                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
334                cx.spawn(|this, mut cx| async move {
335                    load_users.await?;
336
337                    // Users are fetched in parallel above and cached in call to get_users
338                    // No need to parallelize here
339                    let mut updated_contacts = Vec::new();
340                    let this = this
341                        .upgrade()
342                        .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
343                    for contact in message.contacts {
344                        updated_contacts.push(Arc::new(
345                            Contact::from_proto(contact, &this, &mut cx).await?,
346                        ));
347                    }
348
349                    let mut incoming_requests = Vec::new();
350                    for request in message.incoming_requests {
351                        incoming_requests.push({
352                            this.update(&mut cx, |this, cx| {
353                                this.get_user(request.requester_id, cx)
354                            })?
355                            .await?
356                        });
357                    }
358
359                    let mut outgoing_requests = Vec::new();
360                    for requested_user_id in message.outgoing_requests {
361                        outgoing_requests.push(
362                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
363                                .await?,
364                        );
365                    }
366
367                    let removed_contacts =
368                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
369                    let removed_incoming_requests =
370                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
371                    let removed_outgoing_requests =
372                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
373
374                    this.update(&mut cx, |this, cx| {
375                        // Remove contacts
376                        this.contacts
377                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
378                        // Update existing contacts and insert new ones
379                        for updated_contact in updated_contacts {
380                            match this.contacts.binary_search_by_key(
381                                &&updated_contact.user.github_login,
382                                |contact| &contact.user.github_login,
383                            ) {
384                                Ok(ix) => this.contacts[ix] = updated_contact,
385                                Err(ix) => this.contacts.insert(ix, updated_contact),
386                            }
387                        }
388
389                        // Remove incoming contact requests
390                        this.incoming_contact_requests.retain(|user| {
391                            if removed_incoming_requests.contains(&user.id) {
392                                cx.emit(Event::Contact {
393                                    user: user.clone(),
394                                    kind: ContactEventKind::Cancelled,
395                                });
396                                false
397                            } else {
398                                true
399                            }
400                        });
401                        // Update existing incoming requests and insert new ones
402                        for user in incoming_requests {
403                            match this
404                                .incoming_contact_requests
405                                .binary_search_by_key(&&user.github_login, |contact| {
406                                    &contact.github_login
407                                }) {
408                                Ok(ix) => this.incoming_contact_requests[ix] = user,
409                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
410                            }
411                        }
412
413                        // Remove outgoing contact requests
414                        this.outgoing_contact_requests
415                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
416                        // Update existing incoming requests and insert new ones
417                        for request in outgoing_requests {
418                            match this
419                                .outgoing_contact_requests
420                                .binary_search_by_key(&&request.github_login, |contact| {
421                                    &contact.github_login
422                                }) {
423                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
424                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
425                            }
426                        }
427
428                        cx.notify();
429                    })?;
430
431                    Ok(())
432                })
433            }
434        }
435    }
436
437    pub fn contacts(&self) -> &[Arc<Contact>] {
438        &self.contacts
439    }
440
441    pub fn has_contact(&self, user: &Arc<User>) -> bool {
442        self.contacts
443            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
444            .is_ok()
445    }
446
447    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
448        &self.incoming_contact_requests
449    }
450
451    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
452        &self.outgoing_contact_requests
453    }
454
455    pub fn is_contact_request_pending(&self, user: &User) -> bool {
456        self.pending_contact_requests.contains_key(&user.id)
457    }
458
459    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
460        if self
461            .contacts
462            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
463            .is_ok()
464        {
465            ContactRequestStatus::RequestAccepted
466        } else if self
467            .outgoing_contact_requests
468            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
469            .is_ok()
470        {
471            ContactRequestStatus::RequestSent
472        } else if self
473            .incoming_contact_requests
474            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
475            .is_ok()
476        {
477            ContactRequestStatus::RequestReceived
478        } else {
479            ContactRequestStatus::None
480        }
481    }
482
483    pub fn request_contact(
484        &mut self,
485        responder_id: u64,
486        cx: &mut ModelContext<Self>,
487    ) -> Task<Result<()>> {
488        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
489    }
490
491    pub fn remove_contact(
492        &mut self,
493        user_id: u64,
494        cx: &mut ModelContext<Self>,
495    ) -> Task<Result<()>> {
496        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
497    }
498
499    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
500        self.incoming_contact_requests
501            .iter()
502            .any(|user| user.id == user_id)
503    }
504
505    pub fn respond_to_contact_request(
506        &mut self,
507        requester_id: u64,
508        accept: bool,
509        cx: &mut ModelContext<Self>,
510    ) -> Task<Result<()>> {
511        self.perform_contact_request(
512            requester_id,
513            proto::RespondToContactRequest {
514                requester_id,
515                response: if accept {
516                    proto::ContactRequestResponse::Accept
517                } else {
518                    proto::ContactRequestResponse::Decline
519                } as i32,
520            },
521            cx,
522        )
523    }
524
525    pub fn dismiss_contact_request(
526        &self,
527        requester_id: u64,
528        cx: &ModelContext<Self>,
529    ) -> Task<Result<()>> {
530        let client = self.client.upgrade();
531        cx.spawn(move |_, _| async move {
532            client
533                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
534                .request(proto::RespondToContactRequest {
535                    requester_id,
536                    response: proto::ContactRequestResponse::Dismiss as i32,
537                })
538                .await?;
539            Ok(())
540        })
541    }
542
543    fn perform_contact_request<T: RequestMessage>(
544        &mut self,
545        user_id: u64,
546        request: T,
547        cx: &mut ModelContext<Self>,
548    ) -> Task<Result<()>> {
549        let client = self.client.upgrade();
550        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
551        cx.notify();
552
553        cx.spawn(move |this, mut cx| async move {
554            let response = client
555                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
556                .request(request)
557                .await;
558            this.update(&mut cx, |this, cx| {
559                if let Entry::Occupied(mut request_count) =
560                    this.pending_contact_requests.entry(user_id)
561                {
562                    *request_count.get_mut() -= 1;
563                    if *request_count.get() == 0 {
564                        request_count.remove();
565                    }
566                }
567                cx.notify();
568            })?;
569            response?;
570            Ok(())
571        })
572    }
573
574    pub fn clear_contacts(&self) -> impl Future<Output = ()> {
575        let (tx, mut rx) = postage::barrier::channel();
576        self.update_contacts_tx
577            .unbounded_send(UpdateContacts::Clear(tx))
578            .unwrap();
579        async move {
580            rx.next().await;
581        }
582    }
583
584    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
585        let (tx, mut rx) = postage::barrier::channel();
586        self.update_contacts_tx
587            .unbounded_send(UpdateContacts::Wait(tx))
588            .unwrap();
589        async move {
590            rx.next().await;
591        }
592    }
593
594    pub fn get_users(
595        &self,
596        user_ids: Vec<u64>,
597        cx: &ModelContext<Self>,
598    ) -> Task<Result<Vec<Arc<User>>>> {
599        let mut user_ids_to_fetch = user_ids.clone();
600        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
601
602        cx.spawn(|this, mut cx| async move {
603            if !user_ids_to_fetch.is_empty() {
604                this.update(&mut cx, |this, cx| {
605                    this.load_users(
606                        proto::GetUsers {
607                            user_ids: user_ids_to_fetch,
608                        },
609                        cx,
610                    )
611                })?
612                .await?;
613            }
614
615            this.update(&mut cx, |this, _| {
616                user_ids
617                    .iter()
618                    .map(|user_id| {
619                        this.users
620                            .get(user_id)
621                            .cloned()
622                            .ok_or_else(|| anyhow!("user {} not found", user_id))
623                    })
624                    .collect()
625            })?
626        })
627    }
628
629    pub fn fuzzy_search_users(
630        &self,
631        query: String,
632        cx: &ModelContext<Self>,
633    ) -> Task<Result<Vec<Arc<User>>>> {
634        self.load_users(proto::FuzzySearchUsers { query }, cx)
635    }
636
637    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
638        self.users.get(&user_id).cloned()
639    }
640
641    pub fn get_user_optimistic(&self, user_id: u64, cx: &ModelContext<Self>) -> Option<Arc<User>> {
642        if let Some(user) = self.users.get(&user_id).cloned() {
643            return Some(user);
644        }
645
646        self.get_user(user_id, cx).detach_and_log_err(cx);
647        None
648    }
649
650    pub fn get_user(&self, user_id: u64, cx: &ModelContext<Self>) -> Task<Result<Arc<User>>> {
651        if let Some(user) = self.users.get(&user_id).cloned() {
652            return Task::ready(Ok(user));
653        }
654
655        let load_users = self.get_users(vec![user_id], cx);
656        cx.spawn(move |this, mut cx| async move {
657            load_users.await?;
658            this.update(&mut cx, |this, _| {
659                this.users
660                    .get(&user_id)
661                    .cloned()
662                    .ok_or_else(|| anyhow!("server responded with no users"))
663            })?
664        })
665    }
666
667    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
668        self.by_github_login
669            .get(github_login)
670            .and_then(|id| self.users.get(id).cloned())
671    }
672
673    pub fn current_user(&self) -> Option<Arc<User>> {
674        self.current_user.borrow().clone()
675    }
676
677    pub fn current_plan(&self) -> Option<proto::Plan> {
678        self.current_plan
679    }
680
681    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
682        self.current_user.clone()
683    }
684
685    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
686        self.accepted_tos_at
687            .map(|accepted_tos_at| accepted_tos_at.is_some())
688    }
689
690    pub fn accept_terms_of_service(&self, cx: &ModelContext<Self>) -> Task<Result<()>> {
691        if self.current_user().is_none() {
692            return Task::ready(Err(anyhow!("no current user")));
693        };
694
695        let client = self.client.clone();
696        cx.spawn(move |this, mut cx| async move {
697            if let Some(client) = client.upgrade() {
698                let response = client
699                    .request(proto::AcceptTermsOfService {})
700                    .await
701                    .context("error accepting tos")?;
702
703                this.update(&mut cx, |this, _| {
704                    this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at))
705                })
706            } else {
707                Err(anyhow!("client not found"))
708            }
709        })
710    }
711
712    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
713        self.accepted_tos_at = Some(
714            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
715        );
716    }
717
718    fn load_users(
719        &self,
720        request: impl RequestMessage<Response = UsersResponse>,
721        cx: &ModelContext<Self>,
722    ) -> Task<Result<Vec<Arc<User>>>> {
723        let client = self.client.clone();
724        cx.spawn(|this, mut cx| async move {
725            if let Some(rpc) = client.upgrade() {
726                let response = rpc.request(request).await.context("error loading users")?;
727                let users = response.users;
728
729                this.update(&mut cx, |this, _| this.insert(users))
730            } else {
731                Ok(Vec::new())
732            }
733        })
734    }
735
736    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
737        let mut ret = Vec::with_capacity(users.len());
738        for user in users {
739            let user = User::new(user);
740            if let Some(old) = self.users.insert(user.id, user.clone()) {
741                if old.github_login != user.github_login {
742                    self.by_github_login.remove(&old.github_login);
743                }
744            }
745            self.by_github_login
746                .insert(user.github_login.clone(), user.id);
747            ret.push(user)
748        }
749        ret
750    }
751
752    pub fn set_participant_indices(
753        &mut self,
754        participant_indices: HashMap<u64, ParticipantIndex>,
755        cx: &mut ModelContext<Self>,
756    ) {
757        if participant_indices != self.participant_indices {
758            self.participant_indices = participant_indices;
759            cx.emit(Event::ParticipantIndicesChanged);
760        }
761    }
762
763    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
764        &self.participant_indices
765    }
766
767    pub fn participant_names(
768        &self,
769        user_ids: impl Iterator<Item = u64>,
770        cx: &AppContext,
771    ) -> HashMap<u64, SharedString> {
772        let mut ret = HashMap::default();
773        let mut missing_user_ids = Vec::new();
774        for id in user_ids {
775            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
776                ret.insert(id, github_login.into());
777            } else {
778                missing_user_ids.push(id)
779            }
780        }
781        if !missing_user_ids.is_empty() {
782            let this = self.weak_self.clone();
783            cx.spawn(|mut cx| async move {
784                this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
785                    .await
786            })
787            .detach_and_log_err(cx);
788        }
789        ret
790    }
791}
792
793impl User {
794    fn new(message: proto::User) -> Arc<Self> {
795        Arc::new(User {
796            id: message.id,
797            github_login: message.github_login,
798            avatar_uri: message.avatar_url.into(),
799        })
800    }
801}
802
803impl Contact {
804    async fn from_proto(
805        contact: proto::Contact,
806        user_store: &Model<UserStore>,
807        cx: &mut AsyncAppContext,
808    ) -> Result<Self> {
809        let user = user_store
810            .update(cx, |user_store, cx| {
811                user_store.get_user(contact.user_id, cx)
812            })?
813            .await?;
814        Ok(Self {
815            user,
816            online: contact.online,
817            busy: contact.busy,
818        })
819    }
820}
821
822impl Collaborator {
823    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
824        Ok(Self {
825            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
826            replica_id: message.replica_id as ReplicaId,
827            user_id: message.user_id as UserId,
828            is_host: message.is_host,
829        })
830    }
831}