user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context as _, Result};
  3use chrono::{DateTime, Utc};
  4use collections::{hash_map::Entry, HashMap, HashSet};
  5use feature_flags::FeatureFlagAppExt;
  6use futures::{channel::mpsc, Future, StreamExt};
  7use gpui::{
  8    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  9};
 10use postage::{sink::Sink, watch};
 11use rpc::proto::{RequestMessage, UsersResponse};
 12use std::sync::{Arc, Weak};
 13use text::ReplicaId;
 14use util::TryFutureExt as _;
 15
 16pub type UserId = u64;
 17
 18#[derive(
 19    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 20)]
 21pub struct ChannelId(pub u64);
 22
 23impl std::fmt::Display for ChannelId {
 24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 25        self.0.fmt(f)
 26    }
 27}
 28
 29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 30pub struct ProjectId(pub u64);
 31
 32#[derive(
 33    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 34)]
 35pub struct DevServerProjectId(pub u64);
 36
 37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 38pub struct ParticipantIndex(pub u32);
 39
 40#[derive(Default, Debug)]
 41pub struct User {
 42    pub id: UserId,
 43    pub github_login: String,
 44    pub avatar_uri: SharedUri,
 45    pub name: Option<String>,
 46    pub email: Option<String>,
 47}
 48
 49#[derive(Clone, Debug, PartialEq, Eq)]
 50pub struct Collaborator {
 51    pub peer_id: proto::PeerId,
 52    pub replica_id: ReplicaId,
 53    pub user_id: UserId,
 54    pub is_host: bool,
 55}
 56
 57impl PartialOrd for User {
 58    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 59        Some(self.cmp(other))
 60    }
 61}
 62
 63impl Ord for User {
 64    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 65        self.github_login.cmp(&other.github_login)
 66    }
 67}
 68
 69impl PartialEq for User {
 70    fn eq(&self, other: &Self) -> bool {
 71        self.id == other.id && self.github_login == other.github_login
 72    }
 73}
 74
 75impl Eq for User {}
 76
 77#[derive(Debug, PartialEq)]
 78pub struct Contact {
 79    pub user: Arc<User>,
 80    pub online: bool,
 81    pub busy: bool,
 82}
 83
 84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 85pub enum ContactRequestStatus {
 86    None,
 87    RequestSent,
 88    RequestReceived,
 89    RequestAccepted,
 90}
 91
 92pub struct UserStore {
 93    users: HashMap<u64, Arc<User>>,
 94    by_github_login: HashMap<String, u64>,
 95    participant_indices: HashMap<u64, ParticipantIndex>,
 96    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 97    current_plan: Option<proto::Plan>,
 98    current_user: watch::Receiver<Option<Arc<User>>>,
 99    accepted_tos_at: Option<Option<DateTime<Utc>>>,
100    contacts: Vec<Arc<Contact>>,
101    incoming_contact_requests: Vec<Arc<User>>,
102    outgoing_contact_requests: Vec<Arc<User>>,
103    pending_contact_requests: HashMap<u64, usize>,
104    invite_info: Option<InviteInfo>,
105    client: Weak<Client>,
106    _maintain_contacts: Task<()>,
107    _maintain_current_user: Task<Result<()>>,
108    weak_self: WeakEntity<Self>,
109}
110
111#[derive(Clone)]
112pub struct InviteInfo {
113    pub count: u32,
114    pub url: Arc<str>,
115}
116
117pub enum Event {
118    Contact {
119        user: Arc<User>,
120        kind: ContactEventKind,
121    },
122    ShowContacts,
123    ParticipantIndicesChanged,
124    TermsStatusUpdated {
125        accepted: bool,
126    },
127}
128
129#[derive(Clone, Copy)]
130pub enum ContactEventKind {
131    Requested,
132    Accepted,
133    Cancelled,
134}
135
136impl EventEmitter<Event> for UserStore {}
137
138enum UpdateContacts {
139    Update(proto::UpdateContacts),
140    Wait(postage::barrier::Sender),
141    Clear(postage::barrier::Sender),
142}
143
144impl UserStore {
145    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
146        let (mut current_user_tx, current_user_rx) = watch::channel();
147        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
148        let rpc_subscriptions = vec![
149            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
150            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
151            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
152            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
153        ];
154        Self {
155            users: Default::default(),
156            by_github_login: Default::default(),
157            current_user: current_user_rx,
158            current_plan: None,
159            accepted_tos_at: None,
160            contacts: Default::default(),
161            incoming_contact_requests: Default::default(),
162            participant_indices: Default::default(),
163            outgoing_contact_requests: Default::default(),
164            invite_info: None,
165            client: Arc::downgrade(&client),
166            update_contacts_tx,
167            _maintain_contacts: cx.spawn(|this, mut cx| async move {
168                let _subscriptions = rpc_subscriptions;
169                while let Some(message) = update_contacts_rx.next().await {
170                    if let Ok(task) =
171                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
172                    {
173                        task.log_err().await;
174                    } else {
175                        break;
176                    }
177                }
178            }),
179            _maintain_current_user: cx.spawn(|this, mut cx| async move {
180                let mut status = client.status();
181                let weak = Arc::downgrade(&client);
182                drop(client);
183                while let Some(status) = status.next().await {
184                    // if the client is dropped, the app is shutting down.
185                    let Some(client) = weak.upgrade() else {
186                        return Ok(());
187                    };
188                    match status {
189                        Status::Connected { .. } => {
190                            if let Some(user_id) = client.user_id() {
191                                let fetch_user = if let Ok(fetch_user) = this
192                                    .update(&mut cx, |this, cx| {
193                                        this.get_user(user_id, cx).log_err()
194                                    }) {
195                                    fetch_user
196                                } else {
197                                    break;
198                                };
199                                let fetch_private_user_info =
200                                    client.request(proto::GetPrivateUserInfo {}).log_err();
201                                let (user, info) =
202                                    futures::join!(fetch_user, fetch_private_user_info);
203
204                                cx.update(|cx| {
205                                    if let Some(info) = info {
206                                        let disable_staff = std::env::var("ZED_DISABLE_STAFF")
207                                            .map_or(false, |v| !v.is_empty() && v != "0");
208                                        let staff = info.staff && !disable_staff;
209                                        cx.update_flags(staff, info.flags);
210                                        client.telemetry.set_authenticated_user_info(
211                                            Some(info.metrics_id.clone()),
212                                            staff,
213                                        );
214
215                                        this.update(cx, |this, cx| {
216                                            let accepted_tos_at = {
217                                                #[cfg(debug_assertions)]
218                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
219                                                {
220                                                    None
221                                                } else {
222                                                    info.accepted_tos_at
223                                                }
224
225                                                #[cfg(not(debug_assertions))]
226                                                info.accepted_tos_at
227                                            };
228
229                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
230                                            cx.emit(Event::TermsStatusUpdated {
231                                                accepted: accepted_tos_at.is_some(),
232                                            });
233                                        })
234                                    } else {
235                                        anyhow::Ok(())
236                                    }
237                                })??;
238
239                                current_user_tx.send(user).await.ok();
240
241                                this.update(&mut cx, |_, cx| cx.notify())?;
242                            }
243                        }
244                        Status::SignedOut => {
245                            current_user_tx.send(None).await.ok();
246                            this.update(&mut cx, |this, cx| {
247                                cx.notify();
248                                this.clear_contacts()
249                            })?
250                            .await;
251                        }
252                        Status::ConnectionLost => {
253                            this.update(&mut cx, |this, cx| {
254                                cx.notify();
255                                this.clear_contacts()
256                            })?
257                            .await;
258                        }
259                        _ => {}
260                    }
261                }
262                Ok(())
263            }),
264            pending_contact_requests: Default::default(),
265            weak_self: cx.weak_entity(),
266        }
267    }
268
269    #[cfg(feature = "test-support")]
270    pub fn clear_cache(&mut self) {
271        self.users.clear();
272        self.by_github_login.clear();
273    }
274
275    async fn handle_update_invite_info(
276        this: Entity<Self>,
277        message: TypedEnvelope<proto::UpdateInviteInfo>,
278        mut cx: AsyncApp,
279    ) -> Result<()> {
280        this.update(&mut cx, |this, cx| {
281            this.invite_info = Some(InviteInfo {
282                url: Arc::from(message.payload.url),
283                count: message.payload.count,
284            });
285            cx.notify();
286        })?;
287        Ok(())
288    }
289
290    async fn handle_show_contacts(
291        this: Entity<Self>,
292        _: TypedEnvelope<proto::ShowContacts>,
293        mut cx: AsyncApp,
294    ) -> Result<()> {
295        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
296        Ok(())
297    }
298
299    pub fn invite_info(&self) -> Option<&InviteInfo> {
300        self.invite_info.as_ref()
301    }
302
303    async fn handle_update_contacts(
304        this: Entity<Self>,
305        message: TypedEnvelope<proto::UpdateContacts>,
306        mut cx: AsyncApp,
307    ) -> Result<()> {
308        this.update(&mut cx, |this, _| {
309            this.update_contacts_tx
310                .unbounded_send(UpdateContacts::Update(message.payload))
311                .unwrap();
312        })?;
313        Ok(())
314    }
315
316    async fn handle_update_plan(
317        this: Entity<Self>,
318        message: TypedEnvelope<proto::UpdateUserPlan>,
319        mut cx: AsyncApp,
320    ) -> Result<()> {
321        this.update(&mut cx, |this, cx| {
322            this.current_plan = Some(message.payload.plan());
323            cx.notify();
324        })?;
325        Ok(())
326    }
327
328    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
329        match message {
330            UpdateContacts::Wait(barrier) => {
331                drop(barrier);
332                Task::ready(Ok(()))
333            }
334            UpdateContacts::Clear(barrier) => {
335                self.contacts.clear();
336                self.incoming_contact_requests.clear();
337                self.outgoing_contact_requests.clear();
338                drop(barrier);
339                Task::ready(Ok(()))
340            }
341            UpdateContacts::Update(message) => {
342                let mut user_ids = HashSet::default();
343                for contact in &message.contacts {
344                    user_ids.insert(contact.user_id);
345                }
346                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
347                user_ids.extend(message.outgoing_requests.iter());
348
349                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
350                cx.spawn(|this, mut cx| async move {
351                    load_users.await?;
352
353                    // Users are fetched in parallel above and cached in call to get_users
354                    // No need to parallelize here
355                    let mut updated_contacts = Vec::new();
356                    let this = this
357                        .upgrade()
358                        .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
359                    for contact in message.contacts {
360                        updated_contacts.push(Arc::new(
361                            Contact::from_proto(contact, &this, &mut cx).await?,
362                        ));
363                    }
364
365                    let mut incoming_requests = Vec::new();
366                    for request in message.incoming_requests {
367                        incoming_requests.push({
368                            this.update(&mut cx, |this, cx| {
369                                this.get_user(request.requester_id, cx)
370                            })?
371                            .await?
372                        });
373                    }
374
375                    let mut outgoing_requests = Vec::new();
376                    for requested_user_id in message.outgoing_requests {
377                        outgoing_requests.push(
378                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
379                                .await?,
380                        );
381                    }
382
383                    let removed_contacts =
384                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
385                    let removed_incoming_requests =
386                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
387                    let removed_outgoing_requests =
388                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
389
390                    this.update(&mut cx, |this, cx| {
391                        // Remove contacts
392                        this.contacts
393                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
394                        // Update existing contacts and insert new ones
395                        for updated_contact in updated_contacts {
396                            match this.contacts.binary_search_by_key(
397                                &&updated_contact.user.github_login,
398                                |contact| &contact.user.github_login,
399                            ) {
400                                Ok(ix) => this.contacts[ix] = updated_contact,
401                                Err(ix) => this.contacts.insert(ix, updated_contact),
402                            }
403                        }
404
405                        // Remove incoming contact requests
406                        this.incoming_contact_requests.retain(|user| {
407                            if removed_incoming_requests.contains(&user.id) {
408                                cx.emit(Event::Contact {
409                                    user: user.clone(),
410                                    kind: ContactEventKind::Cancelled,
411                                });
412                                false
413                            } else {
414                                true
415                            }
416                        });
417                        // Update existing incoming requests and insert new ones
418                        for user in incoming_requests {
419                            match this
420                                .incoming_contact_requests
421                                .binary_search_by_key(&&user.github_login, |contact| {
422                                    &contact.github_login
423                                }) {
424                                Ok(ix) => this.incoming_contact_requests[ix] = user,
425                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
426                            }
427                        }
428
429                        // Remove outgoing contact requests
430                        this.outgoing_contact_requests
431                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
432                        // Update existing incoming requests and insert new ones
433                        for request in outgoing_requests {
434                            match this
435                                .outgoing_contact_requests
436                                .binary_search_by_key(&&request.github_login, |contact| {
437                                    &contact.github_login
438                                }) {
439                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
440                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
441                            }
442                        }
443
444                        cx.notify();
445                    })?;
446
447                    Ok(())
448                })
449            }
450        }
451    }
452
453    pub fn contacts(&self) -> &[Arc<Contact>] {
454        &self.contacts
455    }
456
457    pub fn has_contact(&self, user: &Arc<User>) -> bool {
458        self.contacts
459            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
460            .is_ok()
461    }
462
463    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
464        &self.incoming_contact_requests
465    }
466
467    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
468        &self.outgoing_contact_requests
469    }
470
471    pub fn is_contact_request_pending(&self, user: &User) -> bool {
472        self.pending_contact_requests.contains_key(&user.id)
473    }
474
475    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
476        if self
477            .contacts
478            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
479            .is_ok()
480        {
481            ContactRequestStatus::RequestAccepted
482        } else if self
483            .outgoing_contact_requests
484            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
485            .is_ok()
486        {
487            ContactRequestStatus::RequestSent
488        } else if self
489            .incoming_contact_requests
490            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
491            .is_ok()
492        {
493            ContactRequestStatus::RequestReceived
494        } else {
495            ContactRequestStatus::None
496        }
497    }
498
499    pub fn request_contact(
500        &mut self,
501        responder_id: u64,
502        cx: &mut Context<Self>,
503    ) -> Task<Result<()>> {
504        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
505    }
506
507    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
508        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
509    }
510
511    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
512        self.incoming_contact_requests
513            .iter()
514            .any(|user| user.id == user_id)
515    }
516
517    pub fn respond_to_contact_request(
518        &mut self,
519        requester_id: u64,
520        accept: bool,
521        cx: &mut Context<Self>,
522    ) -> Task<Result<()>> {
523        self.perform_contact_request(
524            requester_id,
525            proto::RespondToContactRequest {
526                requester_id,
527                response: if accept {
528                    proto::ContactRequestResponse::Accept
529                } else {
530                    proto::ContactRequestResponse::Decline
531                } as i32,
532            },
533            cx,
534        )
535    }
536
537    pub fn dismiss_contact_request(
538        &self,
539        requester_id: u64,
540        cx: &Context<Self>,
541    ) -> Task<Result<()>> {
542        let client = self.client.upgrade();
543        cx.spawn(move |_, _| async move {
544            client
545                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
546                .request(proto::RespondToContactRequest {
547                    requester_id,
548                    response: proto::ContactRequestResponse::Dismiss as i32,
549                })
550                .await?;
551            Ok(())
552        })
553    }
554
555    fn perform_contact_request<T: RequestMessage>(
556        &mut self,
557        user_id: u64,
558        request: T,
559        cx: &mut Context<Self>,
560    ) -> Task<Result<()>> {
561        let client = self.client.upgrade();
562        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
563        cx.notify();
564
565        cx.spawn(move |this, mut cx| async move {
566            let response = client
567                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
568                .request(request)
569                .await;
570            this.update(&mut cx, |this, cx| {
571                if let Entry::Occupied(mut request_count) =
572                    this.pending_contact_requests.entry(user_id)
573                {
574                    *request_count.get_mut() -= 1;
575                    if *request_count.get() == 0 {
576                        request_count.remove();
577                    }
578                }
579                cx.notify();
580            })?;
581            response?;
582            Ok(())
583        })
584    }
585
586    pub fn clear_contacts(&self) -> impl Future<Output = ()> {
587        let (tx, mut rx) = postage::barrier::channel();
588        self.update_contacts_tx
589            .unbounded_send(UpdateContacts::Clear(tx))
590            .unwrap();
591        async move {
592            rx.next().await;
593        }
594    }
595
596    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
597        let (tx, mut rx) = postage::barrier::channel();
598        self.update_contacts_tx
599            .unbounded_send(UpdateContacts::Wait(tx))
600            .unwrap();
601        async move {
602            rx.next().await;
603        }
604    }
605
606    pub fn get_users(
607        &self,
608        user_ids: Vec<u64>,
609        cx: &Context<Self>,
610    ) -> Task<Result<Vec<Arc<User>>>> {
611        let mut user_ids_to_fetch = user_ids.clone();
612        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
613
614        cx.spawn(|this, mut cx| async move {
615            if !user_ids_to_fetch.is_empty() {
616                this.update(&mut cx, |this, cx| {
617                    this.load_users(
618                        proto::GetUsers {
619                            user_ids: user_ids_to_fetch,
620                        },
621                        cx,
622                    )
623                })?
624                .await?;
625            }
626
627            this.update(&mut cx, |this, _| {
628                user_ids
629                    .iter()
630                    .map(|user_id| {
631                        this.users
632                            .get(user_id)
633                            .cloned()
634                            .ok_or_else(|| anyhow!("user {} not found", user_id))
635                    })
636                    .collect()
637            })?
638        })
639    }
640
641    pub fn fuzzy_search_users(
642        &self,
643        query: String,
644        cx: &Context<Self>,
645    ) -> Task<Result<Vec<Arc<User>>>> {
646        self.load_users(proto::FuzzySearchUsers { query }, cx)
647    }
648
649    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
650        self.users.get(&user_id).cloned()
651    }
652
653    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
654        if let Some(user) = self.users.get(&user_id).cloned() {
655            return Some(user);
656        }
657
658        self.get_user(user_id, cx).detach_and_log_err(cx);
659        None
660    }
661
662    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
663        if let Some(user) = self.users.get(&user_id).cloned() {
664            return Task::ready(Ok(user));
665        }
666
667        let load_users = self.get_users(vec![user_id], cx);
668        cx.spawn(move |this, mut cx| async move {
669            load_users.await?;
670            this.update(&mut cx, |this, _| {
671                this.users
672                    .get(&user_id)
673                    .cloned()
674                    .ok_or_else(|| anyhow!("server responded with no users"))
675            })?
676        })
677    }
678
679    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
680        self.by_github_login
681            .get(github_login)
682            .and_then(|id| self.users.get(id).cloned())
683    }
684
685    pub fn current_user(&self) -> Option<Arc<User>> {
686        self.current_user.borrow().clone()
687    }
688
689    pub fn current_plan(&self) -> Option<proto::Plan> {
690        self.current_plan
691    }
692
693    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
694        self.current_user.clone()
695    }
696
697    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
698        self.accepted_tos_at
699            .map(|accepted_tos_at| accepted_tos_at.is_some())
700    }
701
702    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
703        if self.current_user().is_none() {
704            return Task::ready(Err(anyhow!("no current user")));
705        };
706
707        let client = self.client.clone();
708        cx.spawn(move |this, mut cx| async move {
709            if let Some(client) = client.upgrade() {
710                let response = client
711                    .request(proto::AcceptTermsOfService {})
712                    .await
713                    .context("error accepting tos")?;
714
715                this.update(&mut cx, |this, cx| {
716                    this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
717                    cx.emit(Event::TermsStatusUpdated { accepted: true });
718                })
719            } else {
720                Err(anyhow!("client not found"))
721            }
722        })
723    }
724
725    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
726        self.accepted_tos_at = Some(
727            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
728        );
729    }
730
731    fn load_users(
732        &self,
733        request: impl RequestMessage<Response = UsersResponse>,
734        cx: &Context<Self>,
735    ) -> Task<Result<Vec<Arc<User>>>> {
736        let client = self.client.clone();
737        cx.spawn(|this, mut cx| async move {
738            if let Some(rpc) = client.upgrade() {
739                let response = rpc.request(request).await.context("error loading users")?;
740                let users = response.users;
741
742                this.update(&mut cx, |this, _| this.insert(users))
743            } else {
744                Ok(Vec::new())
745            }
746        })
747    }
748
749    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
750        let mut ret = Vec::with_capacity(users.len());
751        for user in users {
752            let user = User::new(user);
753            if let Some(old) = self.users.insert(user.id, user.clone()) {
754                if old.github_login != user.github_login {
755                    self.by_github_login.remove(&old.github_login);
756                }
757            }
758            self.by_github_login
759                .insert(user.github_login.clone(), user.id);
760            ret.push(user)
761        }
762        ret
763    }
764
765    pub fn set_participant_indices(
766        &mut self,
767        participant_indices: HashMap<u64, ParticipantIndex>,
768        cx: &mut Context<Self>,
769    ) {
770        if participant_indices != self.participant_indices {
771            self.participant_indices = participant_indices;
772            cx.emit(Event::ParticipantIndicesChanged);
773        }
774    }
775
776    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
777        &self.participant_indices
778    }
779
780    pub fn participant_names(
781        &self,
782        user_ids: impl Iterator<Item = u64>,
783        cx: &App,
784    ) -> HashMap<u64, SharedString> {
785        let mut ret = HashMap::default();
786        let mut missing_user_ids = Vec::new();
787        for id in user_ids {
788            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
789                ret.insert(id, github_login.into());
790            } else {
791                missing_user_ids.push(id)
792            }
793        }
794        if !missing_user_ids.is_empty() {
795            let this = self.weak_self.clone();
796            cx.spawn(|mut cx| async move {
797                this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
798                    .await
799            })
800            .detach_and_log_err(cx);
801        }
802        ret
803    }
804}
805
806impl User {
807    fn new(message: proto::User) -> Arc<Self> {
808        Arc::new(User {
809            id: message.id,
810            github_login: message.github_login,
811            avatar_uri: message.avatar_url.into(),
812            name: message.name,
813            email: message.email,
814        })
815    }
816}
817
818impl Contact {
819    async fn from_proto(
820        contact: proto::Contact,
821        user_store: &Entity<UserStore>,
822        cx: &mut AsyncApp,
823    ) -> Result<Self> {
824        let user = user_store
825            .update(cx, |user_store, cx| {
826                user_store.get_user(contact.user_id, cx)
827            })?
828            .await?;
829        Ok(Self {
830            user,
831            online: contact.online,
832            busy: contact.busy,
833        })
834    }
835}
836
837impl Collaborator {
838    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
839        Ok(Self {
840            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
841            replica_id: message.replica_id as ReplicaId,
842            user_id: message.user_id as UserId,
843            is_host: message.is_host,
844        })
845    }
846}