user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context, Result};
  3use chrono::{DateTime, Utc};
  4use collections::{hash_map::Entry, HashMap, HashSet};
  5use feature_flags::FeatureFlagAppExt;
  6use futures::{channel::mpsc, Future, StreamExt};
  7use gpui::{
  8    AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
  9    WeakModel,
 10};
 11use postage::{sink::Sink, watch};
 12use rpc::proto::{RequestMessage, UsersResponse};
 13use std::sync::{Arc, Weak};
 14use text::ReplicaId;
 15use util::TryFutureExt as _;
 16
 17pub type UserId = u64;
 18
 19#[derive(
 20    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 21)]
 22pub struct ChannelId(pub u64);
 23
 24impl std::fmt::Display for ChannelId {
 25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 26        self.0.fmt(f)
 27    }
 28}
 29
 30#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 31pub struct ProjectId(pub u64);
 32
 33#[derive(
 34    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 35)]
 36pub struct DevServerProjectId(pub u64);
 37
 38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 39pub struct ParticipantIndex(pub u32);
 40
 41#[derive(Default, Debug)]
 42pub struct User {
 43    pub id: UserId,
 44    pub github_login: String,
 45    pub avatar_uri: SharedUri,
 46}
 47
 48#[derive(Clone, Debug, PartialEq, Eq)]
 49pub struct Collaborator {
 50    pub peer_id: proto::PeerId,
 51    pub replica_id: ReplicaId,
 52    pub user_id: UserId,
 53    pub is_host: bool,
 54}
 55
 56impl PartialOrd for User {
 57    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 58        Some(self.cmp(other))
 59    }
 60}
 61
 62impl Ord for User {
 63    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 64        self.github_login.cmp(&other.github_login)
 65    }
 66}
 67
 68impl PartialEq for User {
 69    fn eq(&self, other: &Self) -> bool {
 70        self.id == other.id && self.github_login == other.github_login
 71    }
 72}
 73
 74impl Eq for User {}
 75
 76#[derive(Debug, PartialEq)]
 77pub struct Contact {
 78    pub user: Arc<User>,
 79    pub online: bool,
 80    pub busy: bool,
 81}
 82
 83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 84pub enum ContactRequestStatus {
 85    None,
 86    RequestSent,
 87    RequestReceived,
 88    RequestAccepted,
 89}
 90
 91pub struct UserStore {
 92    users: HashMap<u64, Arc<User>>,
 93    by_github_login: HashMap<String, u64>,
 94    participant_indices: HashMap<u64, ParticipantIndex>,
 95    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 96    current_plan: Option<proto::Plan>,
 97    current_user: watch::Receiver<Option<Arc<User>>>,
 98    accepted_tos_at: Option<Option<DateTime<Utc>>>,
 99    contacts: Vec<Arc<Contact>>,
100    incoming_contact_requests: Vec<Arc<User>>,
101    outgoing_contact_requests: Vec<Arc<User>>,
102    pending_contact_requests: HashMap<u64, usize>,
103    invite_info: Option<InviteInfo>,
104    client: Weak<Client>,
105    _maintain_contacts: Task<()>,
106    _maintain_current_user: Task<Result<()>>,
107    weak_self: WeakModel<Self>,
108}
109
110#[derive(Clone)]
111pub struct InviteInfo {
112    pub count: u32,
113    pub url: Arc<str>,
114}
115
116pub enum Event {
117    Contact {
118        user: Arc<User>,
119        kind: ContactEventKind,
120    },
121    ShowContacts,
122    ParticipantIndicesChanged,
123}
124
125#[derive(Clone, Copy)]
126pub enum ContactEventKind {
127    Requested,
128    Accepted,
129    Cancelled,
130}
131
132impl EventEmitter<Event> for UserStore {}
133
134enum UpdateContacts {
135    Update(proto::UpdateContacts),
136    Wait(postage::barrier::Sender),
137    Clear(postage::barrier::Sender),
138}
139
140impl UserStore {
141    pub fn new(client: Arc<Client>, cx: &ModelContext<Self>) -> Self {
142        let (mut current_user_tx, current_user_rx) = watch::channel();
143        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
144        let rpc_subscriptions = vec![
145            client.add_message_handler(cx.weak_model(), Self::handle_update_plan),
146            client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
147            client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
148            client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
149        ];
150        Self {
151            users: Default::default(),
152            by_github_login: Default::default(),
153            current_user: current_user_rx,
154            current_plan: None,
155            accepted_tos_at: None,
156            contacts: Default::default(),
157            incoming_contact_requests: Default::default(),
158            participant_indices: Default::default(),
159            outgoing_contact_requests: Default::default(),
160            invite_info: None,
161            client: Arc::downgrade(&client),
162            update_contacts_tx,
163            _maintain_contacts: cx.spawn(|this, mut cx| async move {
164                let _subscriptions = rpc_subscriptions;
165                while let Some(message) = update_contacts_rx.next().await {
166                    if let Ok(task) =
167                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
168                    {
169                        task.log_err().await;
170                    } else {
171                        break;
172                    }
173                }
174            }),
175            _maintain_current_user: cx.spawn(|this, mut cx| async move {
176                let mut status = client.status();
177                let weak = Arc::downgrade(&client);
178                drop(client);
179                while let Some(status) = status.next().await {
180                    // if the client is dropped, the app is shutting down.
181                    let Some(client) = weak.upgrade() else {
182                        return Ok(());
183                    };
184                    match status {
185                        Status::Connected { .. } => {
186                            if let Some(user_id) = client.user_id() {
187                                let fetch_user = if let Ok(fetch_user) = this
188                                    .update(&mut cx, |this, cx| {
189                                        this.get_user(user_id, cx).log_err()
190                                    }) {
191                                    fetch_user
192                                } else {
193                                    break;
194                                };
195                                let fetch_private_user_info =
196                                    client.request(proto::GetPrivateUserInfo {}).log_err();
197                                let (user, info) =
198                                    futures::join!(fetch_user, fetch_private_user_info);
199
200                                cx.update(|cx| {
201                                    if let Some(info) = info {
202                                        let disable_staff = std::env::var("ZED_DISABLE_STAFF")
203                                            .map_or(false, |v| !v.is_empty() && v != "0");
204                                        let staff = info.staff && !disable_staff;
205                                        cx.update_flags(staff, info.flags);
206                                        client.telemetry.set_authenticated_user_info(
207                                            Some(info.metrics_id.clone()),
208                                            staff,
209                                        );
210
211                                        this.update(cx, |this, _| {
212                                            this.set_current_user_accepted_tos_at(
213                                                info.accepted_tos_at,
214                                            );
215                                        })
216                                    } else {
217                                        anyhow::Ok(())
218                                    }
219                                })??;
220
221                                current_user_tx.send(user).await.ok();
222
223                                this.update(&mut cx, |_, cx| cx.notify())?;
224                            }
225                        }
226                        Status::SignedOut => {
227                            current_user_tx.send(None).await.ok();
228                            this.update(&mut cx, |this, cx| {
229                                cx.notify();
230                                this.clear_contacts()
231                            })?
232                            .await;
233                        }
234                        Status::ConnectionLost => {
235                            this.update(&mut cx, |this, cx| {
236                                cx.notify();
237                                this.clear_contacts()
238                            })?
239                            .await;
240                        }
241                        _ => {}
242                    }
243                }
244                Ok(())
245            }),
246            pending_contact_requests: Default::default(),
247            weak_self: cx.weak_model(),
248        }
249    }
250
251    #[cfg(feature = "test-support")]
252    pub fn clear_cache(&mut self) {
253        self.users.clear();
254        self.by_github_login.clear();
255    }
256
257    async fn handle_update_invite_info(
258        this: Model<Self>,
259        message: TypedEnvelope<proto::UpdateInviteInfo>,
260        mut cx: AsyncAppContext,
261    ) -> Result<()> {
262        this.update(&mut cx, |this, cx| {
263            this.invite_info = Some(InviteInfo {
264                url: Arc::from(message.payload.url),
265                count: message.payload.count,
266            });
267            cx.notify();
268        })?;
269        Ok(())
270    }
271
272    async fn handle_show_contacts(
273        this: Model<Self>,
274        _: TypedEnvelope<proto::ShowContacts>,
275        mut cx: AsyncAppContext,
276    ) -> Result<()> {
277        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
278        Ok(())
279    }
280
281    pub fn invite_info(&self) -> Option<&InviteInfo> {
282        self.invite_info.as_ref()
283    }
284
285    async fn handle_update_contacts(
286        this: Model<Self>,
287        message: TypedEnvelope<proto::UpdateContacts>,
288        mut cx: AsyncAppContext,
289    ) -> Result<()> {
290        this.update(&mut cx, |this, _| {
291            this.update_contacts_tx
292                .unbounded_send(UpdateContacts::Update(message.payload))
293                .unwrap();
294        })?;
295        Ok(())
296    }
297
298    async fn handle_update_plan(
299        this: Model<Self>,
300        message: TypedEnvelope<proto::UpdateUserPlan>,
301        mut cx: AsyncAppContext,
302    ) -> Result<()> {
303        this.update(&mut cx, |this, cx| {
304            this.current_plan = Some(message.payload.plan());
305            cx.notify();
306        })?;
307        Ok(())
308    }
309
310    fn update_contacts(
311        &mut self,
312        message: UpdateContacts,
313        cx: &ModelContext<Self>,
314    ) -> Task<Result<()>> {
315        match message {
316            UpdateContacts::Wait(barrier) => {
317                drop(barrier);
318                Task::ready(Ok(()))
319            }
320            UpdateContacts::Clear(barrier) => {
321                self.contacts.clear();
322                self.incoming_contact_requests.clear();
323                self.outgoing_contact_requests.clear();
324                drop(barrier);
325                Task::ready(Ok(()))
326            }
327            UpdateContacts::Update(message) => {
328                let mut user_ids = HashSet::default();
329                for contact in &message.contacts {
330                    user_ids.insert(contact.user_id);
331                }
332                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
333                user_ids.extend(message.outgoing_requests.iter());
334
335                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
336                cx.spawn(|this, mut cx| async move {
337                    load_users.await?;
338
339                    // Users are fetched in parallel above and cached in call to get_users
340                    // No need to parallelize here
341                    let mut updated_contacts = Vec::new();
342                    let this = this
343                        .upgrade()
344                        .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
345                    for contact in message.contacts {
346                        updated_contacts.push(Arc::new(
347                            Contact::from_proto(contact, &this, &mut cx).await?,
348                        ));
349                    }
350
351                    let mut incoming_requests = Vec::new();
352                    for request in message.incoming_requests {
353                        incoming_requests.push({
354                            this.update(&mut cx, |this, cx| {
355                                this.get_user(request.requester_id, cx)
356                            })?
357                            .await?
358                        });
359                    }
360
361                    let mut outgoing_requests = Vec::new();
362                    for requested_user_id in message.outgoing_requests {
363                        outgoing_requests.push(
364                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
365                                .await?,
366                        );
367                    }
368
369                    let removed_contacts =
370                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
371                    let removed_incoming_requests =
372                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
373                    let removed_outgoing_requests =
374                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
375
376                    this.update(&mut cx, |this, cx| {
377                        // Remove contacts
378                        this.contacts
379                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
380                        // Update existing contacts and insert new ones
381                        for updated_contact in updated_contacts {
382                            match this.contacts.binary_search_by_key(
383                                &&updated_contact.user.github_login,
384                                |contact| &contact.user.github_login,
385                            ) {
386                                Ok(ix) => this.contacts[ix] = updated_contact,
387                                Err(ix) => this.contacts.insert(ix, updated_contact),
388                            }
389                        }
390
391                        // Remove incoming contact requests
392                        this.incoming_contact_requests.retain(|user| {
393                            if removed_incoming_requests.contains(&user.id) {
394                                cx.emit(Event::Contact {
395                                    user: user.clone(),
396                                    kind: ContactEventKind::Cancelled,
397                                });
398                                false
399                            } else {
400                                true
401                            }
402                        });
403                        // Update existing incoming requests and insert new ones
404                        for user in incoming_requests {
405                            match this
406                                .incoming_contact_requests
407                                .binary_search_by_key(&&user.github_login, |contact| {
408                                    &contact.github_login
409                                }) {
410                                Ok(ix) => this.incoming_contact_requests[ix] = user,
411                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
412                            }
413                        }
414
415                        // Remove outgoing contact requests
416                        this.outgoing_contact_requests
417                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
418                        // Update existing incoming requests and insert new ones
419                        for request in outgoing_requests {
420                            match this
421                                .outgoing_contact_requests
422                                .binary_search_by_key(&&request.github_login, |contact| {
423                                    &contact.github_login
424                                }) {
425                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
426                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
427                            }
428                        }
429
430                        cx.notify();
431                    })?;
432
433                    Ok(())
434                })
435            }
436        }
437    }
438
439    pub fn contacts(&self) -> &[Arc<Contact>] {
440        &self.contacts
441    }
442
443    pub fn has_contact(&self, user: &Arc<User>) -> bool {
444        self.contacts
445            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
446            .is_ok()
447    }
448
449    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
450        &self.incoming_contact_requests
451    }
452
453    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
454        &self.outgoing_contact_requests
455    }
456
457    pub fn is_contact_request_pending(&self, user: &User) -> bool {
458        self.pending_contact_requests.contains_key(&user.id)
459    }
460
461    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
462        if self
463            .contacts
464            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
465            .is_ok()
466        {
467            ContactRequestStatus::RequestAccepted
468        } else if self
469            .outgoing_contact_requests
470            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
471            .is_ok()
472        {
473            ContactRequestStatus::RequestSent
474        } else if self
475            .incoming_contact_requests
476            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
477            .is_ok()
478        {
479            ContactRequestStatus::RequestReceived
480        } else {
481            ContactRequestStatus::None
482        }
483    }
484
485    pub fn request_contact(
486        &mut self,
487        responder_id: u64,
488        cx: &mut ModelContext<Self>,
489    ) -> Task<Result<()>> {
490        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
491    }
492
493    pub fn remove_contact(
494        &mut self,
495        user_id: u64,
496        cx: &mut ModelContext<Self>,
497    ) -> Task<Result<()>> {
498        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
499    }
500
501    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
502        self.incoming_contact_requests
503            .iter()
504            .any(|user| user.id == user_id)
505    }
506
507    pub fn respond_to_contact_request(
508        &mut self,
509        requester_id: u64,
510        accept: bool,
511        cx: &mut ModelContext<Self>,
512    ) -> Task<Result<()>> {
513        self.perform_contact_request(
514            requester_id,
515            proto::RespondToContactRequest {
516                requester_id,
517                response: if accept {
518                    proto::ContactRequestResponse::Accept
519                } else {
520                    proto::ContactRequestResponse::Decline
521                } as i32,
522            },
523            cx,
524        )
525    }
526
527    pub fn dismiss_contact_request(
528        &self,
529        requester_id: u64,
530        cx: &ModelContext<Self>,
531    ) -> Task<Result<()>> {
532        let client = self.client.upgrade();
533        cx.spawn(move |_, _| async move {
534            client
535                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
536                .request(proto::RespondToContactRequest {
537                    requester_id,
538                    response: proto::ContactRequestResponse::Dismiss as i32,
539                })
540                .await?;
541            Ok(())
542        })
543    }
544
545    fn perform_contact_request<T: RequestMessage>(
546        &mut self,
547        user_id: u64,
548        request: T,
549        cx: &mut ModelContext<Self>,
550    ) -> Task<Result<()>> {
551        let client = self.client.upgrade();
552        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
553        cx.notify();
554
555        cx.spawn(move |this, mut cx| async move {
556            let response = client
557                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
558                .request(request)
559                .await;
560            this.update(&mut cx, |this, cx| {
561                if let Entry::Occupied(mut request_count) =
562                    this.pending_contact_requests.entry(user_id)
563                {
564                    *request_count.get_mut() -= 1;
565                    if *request_count.get() == 0 {
566                        request_count.remove();
567                    }
568                }
569                cx.notify();
570            })?;
571            response?;
572            Ok(())
573        })
574    }
575
576    pub fn clear_contacts(&self) -> impl Future<Output = ()> {
577        let (tx, mut rx) = postage::barrier::channel();
578        self.update_contacts_tx
579            .unbounded_send(UpdateContacts::Clear(tx))
580            .unwrap();
581        async move {
582            rx.next().await;
583        }
584    }
585
586    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
587        let (tx, mut rx) = postage::barrier::channel();
588        self.update_contacts_tx
589            .unbounded_send(UpdateContacts::Wait(tx))
590            .unwrap();
591        async move {
592            rx.next().await;
593        }
594    }
595
596    pub fn get_users(
597        &self,
598        user_ids: Vec<u64>,
599        cx: &ModelContext<Self>,
600    ) -> Task<Result<Vec<Arc<User>>>> {
601        let mut user_ids_to_fetch = user_ids.clone();
602        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
603
604        cx.spawn(|this, mut cx| async move {
605            if !user_ids_to_fetch.is_empty() {
606                this.update(&mut cx, |this, cx| {
607                    this.load_users(
608                        proto::GetUsers {
609                            user_ids: user_ids_to_fetch,
610                        },
611                        cx,
612                    )
613                })?
614                .await?;
615            }
616
617            this.update(&mut cx, |this, _| {
618                user_ids
619                    .iter()
620                    .map(|user_id| {
621                        this.users
622                            .get(user_id)
623                            .cloned()
624                            .ok_or_else(|| anyhow!("user {} not found", user_id))
625                    })
626                    .collect()
627            })?
628        })
629    }
630
631    pub fn fuzzy_search_users(
632        &self,
633        query: String,
634        cx: &ModelContext<Self>,
635    ) -> Task<Result<Vec<Arc<User>>>> {
636        self.load_users(proto::FuzzySearchUsers { query }, cx)
637    }
638
639    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
640        self.users.get(&user_id).cloned()
641    }
642
643    pub fn get_user_optimistic(&self, user_id: u64, cx: &ModelContext<Self>) -> Option<Arc<User>> {
644        if let Some(user) = self.users.get(&user_id).cloned() {
645            return Some(user);
646        }
647
648        self.get_user(user_id, cx).detach_and_log_err(cx);
649        None
650    }
651
652    pub fn get_user(&self, user_id: u64, cx: &ModelContext<Self>) -> Task<Result<Arc<User>>> {
653        if let Some(user) = self.users.get(&user_id).cloned() {
654            return Task::ready(Ok(user));
655        }
656
657        let load_users = self.get_users(vec![user_id], cx);
658        cx.spawn(move |this, mut cx| async move {
659            load_users.await?;
660            this.update(&mut cx, |this, _| {
661                this.users
662                    .get(&user_id)
663                    .cloned()
664                    .ok_or_else(|| anyhow!("server responded with no users"))
665            })?
666        })
667    }
668
669    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
670        self.by_github_login
671            .get(github_login)
672            .and_then(|id| self.users.get(id).cloned())
673    }
674
675    pub fn current_user(&self) -> Option<Arc<User>> {
676        self.current_user.borrow().clone()
677    }
678
679    pub fn current_plan(&self) -> Option<proto::Plan> {
680        self.current_plan
681    }
682
683    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
684        self.current_user.clone()
685    }
686
687    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
688        self.accepted_tos_at
689            .map(|accepted_tos_at| accepted_tos_at.is_some())
690    }
691
692    pub fn accept_terms_of_service(&self, cx: &ModelContext<Self>) -> Task<Result<()>> {
693        if self.current_user().is_none() {
694            return Task::ready(Err(anyhow!("no current user")));
695        };
696
697        let client = self.client.clone();
698        cx.spawn(move |this, mut cx| async move {
699            if let Some(client) = client.upgrade() {
700                let response = client
701                    .request(proto::AcceptTermsOfService {})
702                    .await
703                    .context("error accepting tos")?;
704
705                this.update(&mut cx, |this, _| {
706                    this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at))
707                })
708            } else {
709                Err(anyhow!("client not found"))
710            }
711        })
712    }
713
714    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
715        self.accepted_tos_at = Some(
716            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
717        );
718    }
719
720    fn load_users(
721        &self,
722        request: impl RequestMessage<Response = UsersResponse>,
723        cx: &ModelContext<Self>,
724    ) -> Task<Result<Vec<Arc<User>>>> {
725        let client = self.client.clone();
726        cx.spawn(|this, mut cx| async move {
727            if let Some(rpc) = client.upgrade() {
728                let response = rpc.request(request).await.context("error loading users")?;
729                let users = response.users;
730
731                this.update(&mut cx, |this, _| this.insert(users))
732            } else {
733                Ok(Vec::new())
734            }
735        })
736    }
737
738    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
739        let mut ret = Vec::with_capacity(users.len());
740        for user in users {
741            let user = User::new(user);
742            if let Some(old) = self.users.insert(user.id, user.clone()) {
743                if old.github_login != user.github_login {
744                    self.by_github_login.remove(&old.github_login);
745                }
746            }
747            self.by_github_login
748                .insert(user.github_login.clone(), user.id);
749            ret.push(user)
750        }
751        ret
752    }
753
754    pub fn set_participant_indices(
755        &mut self,
756        participant_indices: HashMap<u64, ParticipantIndex>,
757        cx: &mut ModelContext<Self>,
758    ) {
759        if participant_indices != self.participant_indices {
760            self.participant_indices = participant_indices;
761            cx.emit(Event::ParticipantIndicesChanged);
762        }
763    }
764
765    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
766        &self.participant_indices
767    }
768
769    pub fn participant_names(
770        &self,
771        user_ids: impl Iterator<Item = u64>,
772        cx: &AppContext,
773    ) -> HashMap<u64, SharedString> {
774        let mut ret = HashMap::default();
775        let mut missing_user_ids = Vec::new();
776        for id in user_ids {
777            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
778                ret.insert(id, github_login.into());
779            } else {
780                missing_user_ids.push(id)
781            }
782        }
783        if !missing_user_ids.is_empty() {
784            let this = self.weak_self.clone();
785            cx.spawn(|mut cx| async move {
786                this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
787                    .await
788            })
789            .detach_and_log_err(cx);
790        }
791        ret
792    }
793}
794
795impl User {
796    fn new(message: proto::User) -> Arc<Self> {
797        Arc::new(User {
798            id: message.id,
799            github_login: message.github_login,
800            avatar_uri: message.avatar_url.into(),
801        })
802    }
803}
804
805impl Contact {
806    async fn from_proto(
807        contact: proto::Contact,
808        user_store: &Model<UserStore>,
809        cx: &mut AsyncAppContext,
810    ) -> Result<Self> {
811        let user = user_store
812            .update(cx, |user_store, cx| {
813                user_store.get_user(contact.user_id, cx)
814            })?
815            .await?;
816        Ok(Self {
817            user,
818            online: contact.online,
819            busy: contact.busy,
820        })
821    }
822}
823
824impl Collaborator {
825    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
826        Ok(Self {
827            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
828            replica_id: message.replica_id as ReplicaId,
829            user_id: message.user_id as UserId,
830            is_host: message.is_host,
831        })
832    }
833}