user.rs

  1use super::{proto, Client, Status, TypedEnvelope};
  2use anyhow::{anyhow, Context as _, Result};
  3use chrono::{DateTime, Utc};
  4use collections::{hash_map::Entry, HashMap, HashSet};
  5use feature_flags::FeatureFlagAppExt;
  6use futures::{channel::mpsc, Future, StreamExt};
  7use gpui::{
  8    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  9};
 10use postage::{sink::Sink, watch};
 11use rpc::proto::{RequestMessage, UsersResponse};
 12use std::sync::{Arc, Weak};
 13use text::ReplicaId;
 14use util::TryFutureExt as _;
 15
 16pub type UserId = u64;
 17
 18#[derive(
 19    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 20)]
 21pub struct ChannelId(pub u64);
 22
 23impl std::fmt::Display for ChannelId {
 24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 25        self.0.fmt(f)
 26    }
 27}
 28
 29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 30pub struct ProjectId(pub u64);
 31
 32#[derive(
 33    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 34)]
 35pub struct DevServerProjectId(pub u64);
 36
 37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 38pub struct ParticipantIndex(pub u32);
 39
 40#[derive(Default, Debug)]
 41pub struct User {
 42    pub id: UserId,
 43    pub github_login: String,
 44    pub avatar_uri: SharedUri,
 45    pub name: Option<String>,
 46    pub email: Option<String>,
 47}
 48
 49#[derive(Clone, Debug, PartialEq, Eq)]
 50pub struct Collaborator {
 51    pub peer_id: proto::PeerId,
 52    pub replica_id: ReplicaId,
 53    pub user_id: UserId,
 54    pub is_host: bool,
 55}
 56
 57impl PartialOrd for User {
 58    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 59        Some(self.cmp(other))
 60    }
 61}
 62
 63impl Ord for User {
 64    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 65        self.github_login.cmp(&other.github_login)
 66    }
 67}
 68
 69impl PartialEq for User {
 70    fn eq(&self, other: &Self) -> bool {
 71        self.id == other.id && self.github_login == other.github_login
 72    }
 73}
 74
 75impl Eq for User {}
 76
 77#[derive(Debug, PartialEq)]
 78pub struct Contact {
 79    pub user: Arc<User>,
 80    pub online: bool,
 81    pub busy: bool,
 82}
 83
 84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 85pub enum ContactRequestStatus {
 86    None,
 87    RequestSent,
 88    RequestReceived,
 89    RequestAccepted,
 90}
 91
 92pub struct UserStore {
 93    users: HashMap<u64, Arc<User>>,
 94    by_github_login: HashMap<String, u64>,
 95    participant_indices: HashMap<u64, ParticipantIndex>,
 96    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 97    current_plan: Option<proto::Plan>,
 98    current_user: watch::Receiver<Option<Arc<User>>>,
 99    accepted_tos_at: Option<Option<DateTime<Utc>>>,
100    contacts: Vec<Arc<Contact>>,
101    incoming_contact_requests: Vec<Arc<User>>,
102    outgoing_contact_requests: Vec<Arc<User>>,
103    pending_contact_requests: HashMap<u64, usize>,
104    invite_info: Option<InviteInfo>,
105    client: Weak<Client>,
106    _maintain_contacts: Task<()>,
107    _maintain_current_user: Task<Result<()>>,
108    weak_self: WeakEntity<Self>,
109}
110
111#[derive(Clone)]
112pub struct InviteInfo {
113    pub count: u32,
114    pub url: Arc<str>,
115}
116
117pub enum Event {
118    Contact {
119        user: Arc<User>,
120        kind: ContactEventKind,
121    },
122    ShowContacts,
123    ParticipantIndicesChanged,
124    PrivateUserInfoUpdated,
125}
126
127#[derive(Clone, Copy)]
128pub enum ContactEventKind {
129    Requested,
130    Accepted,
131    Cancelled,
132}
133
134impl EventEmitter<Event> for UserStore {}
135
136enum UpdateContacts {
137    Update(proto::UpdateContacts),
138    Wait(postage::barrier::Sender),
139    Clear(postage::barrier::Sender),
140}
141
142impl UserStore {
143    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
144        let (mut current_user_tx, current_user_rx) = watch::channel();
145        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
146        let rpc_subscriptions = vec![
147            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
148            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
149            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
150            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
151        ];
152        Self {
153            users: Default::default(),
154            by_github_login: Default::default(),
155            current_user: current_user_rx,
156            current_plan: None,
157            accepted_tos_at: None,
158            contacts: Default::default(),
159            incoming_contact_requests: Default::default(),
160            participant_indices: Default::default(),
161            outgoing_contact_requests: Default::default(),
162            invite_info: None,
163            client: Arc::downgrade(&client),
164            update_contacts_tx,
165            _maintain_contacts: cx.spawn(|this, mut cx| async move {
166                let _subscriptions = rpc_subscriptions;
167                while let Some(message) = update_contacts_rx.next().await {
168                    if let Ok(task) =
169                        this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
170                    {
171                        task.log_err().await;
172                    } else {
173                        break;
174                    }
175                }
176            }),
177            _maintain_current_user: cx.spawn(|this, mut cx| async move {
178                let mut status = client.status();
179                let weak = Arc::downgrade(&client);
180                drop(client);
181                while let Some(status) = status.next().await {
182                    // if the client is dropped, the app is shutting down.
183                    let Some(client) = weak.upgrade() else {
184                        return Ok(());
185                    };
186                    match status {
187                        Status::Connected { .. } => {
188                            if let Some(user_id) = client.user_id() {
189                                let fetch_user = if let Ok(fetch_user) = this
190                                    .update(&mut cx, |this, cx| {
191                                        this.get_user(user_id, cx).log_err()
192                                    }) {
193                                    fetch_user
194                                } else {
195                                    break;
196                                };
197                                let fetch_private_user_info =
198                                    client.request(proto::GetPrivateUserInfo {}).log_err();
199                                let (user, info) =
200                                    futures::join!(fetch_user, fetch_private_user_info);
201
202                                cx.update(|cx| {
203                                    if let Some(info) = info {
204                                        let staff =
205                                            info.staff && !*feature_flags::ZED_DISABLE_STAFF;
206                                        cx.update_flags(staff, info.flags);
207                                        client.telemetry.set_authenticated_user_info(
208                                            Some(info.metrics_id.clone()),
209                                            staff,
210                                        );
211
212                                        this.update(cx, |this, cx| {
213                                            let accepted_tos_at = {
214                                                #[cfg(debug_assertions)]
215                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
216                                                {
217                                                    None
218                                                } else {
219                                                    info.accepted_tos_at
220                                                }
221
222                                                #[cfg(not(debug_assertions))]
223                                                info.accepted_tos_at
224                                            };
225
226                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
227                                            cx.emit(Event::PrivateUserInfoUpdated);
228                                        })
229                                    } else {
230                                        anyhow::Ok(())
231                                    }
232                                })??;
233
234                                current_user_tx.send(user).await.ok();
235
236                                this.update(&mut cx, |_, cx| cx.notify())?;
237                            }
238                        }
239                        Status::SignedOut => {
240                            current_user_tx.send(None).await.ok();
241                            this.update(&mut cx, |this, cx| {
242                                this.accepted_tos_at = None;
243                                cx.emit(Event::PrivateUserInfoUpdated);
244                                cx.notify();
245                                this.clear_contacts()
246                            })?
247                            .await;
248                        }
249                        Status::ConnectionLost => {
250                            this.update(&mut cx, |this, cx| {
251                                cx.notify();
252                                this.clear_contacts()
253                            })?
254                            .await;
255                        }
256                        _ => {}
257                    }
258                }
259                Ok(())
260            }),
261            pending_contact_requests: Default::default(),
262            weak_self: cx.weak_entity(),
263        }
264    }
265
266    #[cfg(feature = "test-support")]
267    pub fn clear_cache(&mut self) {
268        self.users.clear();
269        self.by_github_login.clear();
270    }
271
272    async fn handle_update_invite_info(
273        this: Entity<Self>,
274        message: TypedEnvelope<proto::UpdateInviteInfo>,
275        mut cx: AsyncApp,
276    ) -> Result<()> {
277        this.update(&mut cx, |this, cx| {
278            this.invite_info = Some(InviteInfo {
279                url: Arc::from(message.payload.url),
280                count: message.payload.count,
281            });
282            cx.notify();
283        })?;
284        Ok(())
285    }
286
287    async fn handle_show_contacts(
288        this: Entity<Self>,
289        _: TypedEnvelope<proto::ShowContacts>,
290        mut cx: AsyncApp,
291    ) -> Result<()> {
292        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
293        Ok(())
294    }
295
296    pub fn invite_info(&self) -> Option<&InviteInfo> {
297        self.invite_info.as_ref()
298    }
299
300    async fn handle_update_contacts(
301        this: Entity<Self>,
302        message: TypedEnvelope<proto::UpdateContacts>,
303        mut cx: AsyncApp,
304    ) -> Result<()> {
305        this.update(&mut cx, |this, _| {
306            this.update_contacts_tx
307                .unbounded_send(UpdateContacts::Update(message.payload))
308                .unwrap();
309        })?;
310        Ok(())
311    }
312
313    async fn handle_update_plan(
314        this: Entity<Self>,
315        message: TypedEnvelope<proto::UpdateUserPlan>,
316        mut cx: AsyncApp,
317    ) -> Result<()> {
318        this.update(&mut cx, |this, cx| {
319            this.current_plan = Some(message.payload.plan());
320            cx.notify();
321        })?;
322        Ok(())
323    }
324
325    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
326        match message {
327            UpdateContacts::Wait(barrier) => {
328                drop(barrier);
329                Task::ready(Ok(()))
330            }
331            UpdateContacts::Clear(barrier) => {
332                self.contacts.clear();
333                self.incoming_contact_requests.clear();
334                self.outgoing_contact_requests.clear();
335                drop(barrier);
336                Task::ready(Ok(()))
337            }
338            UpdateContacts::Update(message) => {
339                let mut user_ids = HashSet::default();
340                for contact in &message.contacts {
341                    user_ids.insert(contact.user_id);
342                }
343                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
344                user_ids.extend(message.outgoing_requests.iter());
345
346                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
347                cx.spawn(|this, mut cx| async move {
348                    load_users.await?;
349
350                    // Users are fetched in parallel above and cached in call to get_users
351                    // No need to parallelize here
352                    let mut updated_contacts = Vec::new();
353                    let this = this
354                        .upgrade()
355                        .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
356                    for contact in message.contacts {
357                        updated_contacts.push(Arc::new(
358                            Contact::from_proto(contact, &this, &mut cx).await?,
359                        ));
360                    }
361
362                    let mut incoming_requests = Vec::new();
363                    for request in message.incoming_requests {
364                        incoming_requests.push({
365                            this.update(&mut cx, |this, cx| {
366                                this.get_user(request.requester_id, cx)
367                            })?
368                            .await?
369                        });
370                    }
371
372                    let mut outgoing_requests = Vec::new();
373                    for requested_user_id in message.outgoing_requests {
374                        outgoing_requests.push(
375                            this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
376                                .await?,
377                        );
378                    }
379
380                    let removed_contacts =
381                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
382                    let removed_incoming_requests =
383                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
384                    let removed_outgoing_requests =
385                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
386
387                    this.update(&mut cx, |this, cx| {
388                        // Remove contacts
389                        this.contacts
390                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
391                        // Update existing contacts and insert new ones
392                        for updated_contact in updated_contacts {
393                            match this.contacts.binary_search_by_key(
394                                &&updated_contact.user.github_login,
395                                |contact| &contact.user.github_login,
396                            ) {
397                                Ok(ix) => this.contacts[ix] = updated_contact,
398                                Err(ix) => this.contacts.insert(ix, updated_contact),
399                            }
400                        }
401
402                        // Remove incoming contact requests
403                        this.incoming_contact_requests.retain(|user| {
404                            if removed_incoming_requests.contains(&user.id) {
405                                cx.emit(Event::Contact {
406                                    user: user.clone(),
407                                    kind: ContactEventKind::Cancelled,
408                                });
409                                false
410                            } else {
411                                true
412                            }
413                        });
414                        // Update existing incoming requests and insert new ones
415                        for user in incoming_requests {
416                            match this
417                                .incoming_contact_requests
418                                .binary_search_by_key(&&user.github_login, |contact| {
419                                    &contact.github_login
420                                }) {
421                                Ok(ix) => this.incoming_contact_requests[ix] = user,
422                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
423                            }
424                        }
425
426                        // Remove outgoing contact requests
427                        this.outgoing_contact_requests
428                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
429                        // Update existing incoming requests and insert new ones
430                        for request in outgoing_requests {
431                            match this
432                                .outgoing_contact_requests
433                                .binary_search_by_key(&&request.github_login, |contact| {
434                                    &contact.github_login
435                                }) {
436                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
437                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
438                            }
439                        }
440
441                        cx.notify();
442                    })?;
443
444                    Ok(())
445                })
446            }
447        }
448    }
449
450    pub fn contacts(&self) -> &[Arc<Contact>] {
451        &self.contacts
452    }
453
454    pub fn has_contact(&self, user: &Arc<User>) -> bool {
455        self.contacts
456            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
457            .is_ok()
458    }
459
460    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
461        &self.incoming_contact_requests
462    }
463
464    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
465        &self.outgoing_contact_requests
466    }
467
468    pub fn is_contact_request_pending(&self, user: &User) -> bool {
469        self.pending_contact_requests.contains_key(&user.id)
470    }
471
472    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
473        if self
474            .contacts
475            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
476            .is_ok()
477        {
478            ContactRequestStatus::RequestAccepted
479        } else if self
480            .outgoing_contact_requests
481            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
482            .is_ok()
483        {
484            ContactRequestStatus::RequestSent
485        } else if self
486            .incoming_contact_requests
487            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
488            .is_ok()
489        {
490            ContactRequestStatus::RequestReceived
491        } else {
492            ContactRequestStatus::None
493        }
494    }
495
496    pub fn request_contact(
497        &mut self,
498        responder_id: u64,
499        cx: &mut Context<Self>,
500    ) -> Task<Result<()>> {
501        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
502    }
503
504    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
505        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
506    }
507
508    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
509        self.incoming_contact_requests
510            .iter()
511            .any(|user| user.id == user_id)
512    }
513
514    pub fn respond_to_contact_request(
515        &mut self,
516        requester_id: u64,
517        accept: bool,
518        cx: &mut Context<Self>,
519    ) -> Task<Result<()>> {
520        self.perform_contact_request(
521            requester_id,
522            proto::RespondToContactRequest {
523                requester_id,
524                response: if accept {
525                    proto::ContactRequestResponse::Accept
526                } else {
527                    proto::ContactRequestResponse::Decline
528                } as i32,
529            },
530            cx,
531        )
532    }
533
534    pub fn dismiss_contact_request(
535        &self,
536        requester_id: u64,
537        cx: &Context<Self>,
538    ) -> Task<Result<()>> {
539        let client = self.client.upgrade();
540        cx.spawn(move |_, _| async move {
541            client
542                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
543                .request(proto::RespondToContactRequest {
544                    requester_id,
545                    response: proto::ContactRequestResponse::Dismiss as i32,
546                })
547                .await?;
548            Ok(())
549        })
550    }
551
552    fn perform_contact_request<T: RequestMessage>(
553        &mut self,
554        user_id: u64,
555        request: T,
556        cx: &mut Context<Self>,
557    ) -> Task<Result<()>> {
558        let client = self.client.upgrade();
559        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
560        cx.notify();
561
562        cx.spawn(move |this, mut cx| async move {
563            let response = client
564                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
565                .request(request)
566                .await;
567            this.update(&mut cx, |this, cx| {
568                if let Entry::Occupied(mut request_count) =
569                    this.pending_contact_requests.entry(user_id)
570                {
571                    *request_count.get_mut() -= 1;
572                    if *request_count.get() == 0 {
573                        request_count.remove();
574                    }
575                }
576                cx.notify();
577            })?;
578            response?;
579            Ok(())
580        })
581    }
582
583    pub fn clear_contacts(&self) -> impl Future<Output = ()> {
584        let (tx, mut rx) = postage::barrier::channel();
585        self.update_contacts_tx
586            .unbounded_send(UpdateContacts::Clear(tx))
587            .unwrap();
588        async move {
589            rx.next().await;
590        }
591    }
592
593    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
594        let (tx, mut rx) = postage::barrier::channel();
595        self.update_contacts_tx
596            .unbounded_send(UpdateContacts::Wait(tx))
597            .unwrap();
598        async move {
599            rx.next().await;
600        }
601    }
602
603    pub fn get_users(
604        &self,
605        user_ids: Vec<u64>,
606        cx: &Context<Self>,
607    ) -> Task<Result<Vec<Arc<User>>>> {
608        let mut user_ids_to_fetch = user_ids.clone();
609        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
610
611        cx.spawn(|this, mut cx| async move {
612            if !user_ids_to_fetch.is_empty() {
613                this.update(&mut cx, |this, cx| {
614                    this.load_users(
615                        proto::GetUsers {
616                            user_ids: user_ids_to_fetch,
617                        },
618                        cx,
619                    )
620                })?
621                .await?;
622            }
623
624            this.update(&mut cx, |this, _| {
625                user_ids
626                    .iter()
627                    .map(|user_id| {
628                        this.users
629                            .get(user_id)
630                            .cloned()
631                            .ok_or_else(|| anyhow!("user {} not found", user_id))
632                    })
633                    .collect()
634            })?
635        })
636    }
637
638    pub fn fuzzy_search_users(
639        &self,
640        query: String,
641        cx: &Context<Self>,
642    ) -> Task<Result<Vec<Arc<User>>>> {
643        self.load_users(proto::FuzzySearchUsers { query }, cx)
644    }
645
646    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
647        self.users.get(&user_id).cloned()
648    }
649
650    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
651        if let Some(user) = self.users.get(&user_id).cloned() {
652            return Some(user);
653        }
654
655        self.get_user(user_id, cx).detach_and_log_err(cx);
656        None
657    }
658
659    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
660        if let Some(user) = self.users.get(&user_id).cloned() {
661            return Task::ready(Ok(user));
662        }
663
664        let load_users = self.get_users(vec![user_id], cx);
665        cx.spawn(move |this, mut cx| async move {
666            load_users.await?;
667            this.update(&mut cx, |this, _| {
668                this.users
669                    .get(&user_id)
670                    .cloned()
671                    .ok_or_else(|| anyhow!("server responded with no users"))
672            })?
673        })
674    }
675
676    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
677        self.by_github_login
678            .get(github_login)
679            .and_then(|id| self.users.get(id).cloned())
680    }
681
682    pub fn current_user(&self) -> Option<Arc<User>> {
683        self.current_user.borrow().clone()
684    }
685
686    pub fn current_plan(&self) -> Option<proto::Plan> {
687        self.current_plan
688    }
689
690    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
691        self.current_user.clone()
692    }
693
694    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
695        self.accepted_tos_at
696            .map(|accepted_tos_at| accepted_tos_at.is_some())
697    }
698
699    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
700        if self.current_user().is_none() {
701            return Task::ready(Err(anyhow!("no current user")));
702        };
703
704        let client = self.client.clone();
705        cx.spawn(move |this, mut cx| async move {
706            if let Some(client) = client.upgrade() {
707                let response = client
708                    .request(proto::AcceptTermsOfService {})
709                    .await
710                    .context("error accepting tos")?;
711
712                this.update(&mut cx, |this, cx| {
713                    this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
714                    cx.emit(Event::PrivateUserInfoUpdated);
715                })
716            } else {
717                Err(anyhow!("client not found"))
718            }
719        })
720    }
721
722    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
723        self.accepted_tos_at = Some(
724            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
725        );
726    }
727
728    fn load_users(
729        &self,
730        request: impl RequestMessage<Response = UsersResponse>,
731        cx: &Context<Self>,
732    ) -> Task<Result<Vec<Arc<User>>>> {
733        let client = self.client.clone();
734        cx.spawn(|this, mut cx| async move {
735            if let Some(rpc) = client.upgrade() {
736                let response = rpc.request(request).await.context("error loading users")?;
737                let users = response.users;
738
739                this.update(&mut cx, |this, _| this.insert(users))
740            } else {
741                Ok(Vec::new())
742            }
743        })
744    }
745
746    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
747        let mut ret = Vec::with_capacity(users.len());
748        for user in users {
749            let user = User::new(user);
750            if let Some(old) = self.users.insert(user.id, user.clone()) {
751                if old.github_login != user.github_login {
752                    self.by_github_login.remove(&old.github_login);
753                }
754            }
755            self.by_github_login
756                .insert(user.github_login.clone(), user.id);
757            ret.push(user)
758        }
759        ret
760    }
761
762    pub fn set_participant_indices(
763        &mut self,
764        participant_indices: HashMap<u64, ParticipantIndex>,
765        cx: &mut Context<Self>,
766    ) {
767        if participant_indices != self.participant_indices {
768            self.participant_indices = participant_indices;
769            cx.emit(Event::ParticipantIndicesChanged);
770        }
771    }
772
773    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
774        &self.participant_indices
775    }
776
777    pub fn participant_names(
778        &self,
779        user_ids: impl Iterator<Item = u64>,
780        cx: &App,
781    ) -> HashMap<u64, SharedString> {
782        let mut ret = HashMap::default();
783        let mut missing_user_ids = Vec::new();
784        for id in user_ids {
785            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
786                ret.insert(id, github_login.into());
787            } else {
788                missing_user_ids.push(id)
789            }
790        }
791        if !missing_user_ids.is_empty() {
792            let this = self.weak_self.clone();
793            cx.spawn(|mut cx| async move {
794                this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
795                    .await
796            })
797            .detach_and_log_err(cx);
798        }
799        ret
800    }
801}
802
803impl User {
804    fn new(message: proto::User) -> Arc<Self> {
805        Arc::new(User {
806            id: message.id,
807            github_login: message.github_login,
808            avatar_uri: message.avatar_url.into(),
809            name: message.name,
810            email: message.email,
811        })
812    }
813}
814
815impl Contact {
816    async fn from_proto(
817        contact: proto::Contact,
818        user_store: &Entity<UserStore>,
819        cx: &mut AsyncApp,
820    ) -> Result<Self> {
821        let user = user_store
822            .update(cx, |user_store, cx| {
823                user_store.get_user(contact.user_id, cx)
824            })?
825            .await?;
826        Ok(Self {
827            user,
828            online: contact.online,
829            busy: contact.busy,
830        })
831    }
832}
833
834impl Collaborator {
835    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
836        Ok(Self {
837            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
838            replica_id: message.replica_id as ReplicaId,
839            user_id: message.user_id as UserId,
840            is_host: message.is_host,
841        })
842    }
843}