1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use chrono::{DateTime, Utc};
4use collections::{hash_map::Entry, HashMap, HashSet};
5use feature_flags::FeatureFlagAppExt;
6use futures::{channel::mpsc, Future, StreamExt};
7use gpui::{
8 AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
9 WeakModel,
10};
11use postage::{sink::Sink, watch};
12use rpc::proto::{RequestMessage, UsersResponse};
13use std::sync::{Arc, Weak};
14use text::ReplicaId;
15use util::TryFutureExt as _;
16
17pub type UserId = u64;
18
19#[derive(
20 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
21)]
22pub struct ChannelId(pub u64);
23
24impl std::fmt::Display for ChannelId {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 self.0.fmt(f)
27 }
28}
29
30#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
31pub struct ProjectId(pub u64);
32
33#[derive(
34 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
35)]
36pub struct DevServerProjectId(pub u64);
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct ParticipantIndex(pub u32);
40
41#[derive(Default, Debug)]
42pub struct User {
43 pub id: UserId,
44 pub github_login: String,
45 pub avatar_uri: SharedUri,
46}
47
48#[derive(Clone, Debug, PartialEq, Eq)]
49pub struct Collaborator {
50 pub peer_id: proto::PeerId,
51 pub replica_id: ReplicaId,
52 pub user_id: UserId,
53 pub is_host: bool,
54}
55
56impl PartialOrd for User {
57 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
58 Some(self.cmp(other))
59 }
60}
61
62impl Ord for User {
63 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
64 self.github_login.cmp(&other.github_login)
65 }
66}
67
68impl PartialEq for User {
69 fn eq(&self, other: &Self) -> bool {
70 self.id == other.id && self.github_login == other.github_login
71 }
72}
73
74impl Eq for User {}
75
76#[derive(Debug, PartialEq)]
77pub struct Contact {
78 pub user: Arc<User>,
79 pub online: bool,
80 pub busy: bool,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub enum ContactRequestStatus {
85 None,
86 RequestSent,
87 RequestReceived,
88 RequestAccepted,
89}
90
91pub struct UserStore {
92 users: HashMap<u64, Arc<User>>,
93 by_github_login: HashMap<String, u64>,
94 participant_indices: HashMap<u64, ParticipantIndex>,
95 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
96 current_plan: Option<proto::Plan>,
97 current_user: watch::Receiver<Option<Arc<User>>>,
98 accepted_tos_at: Option<Option<DateTime<Utc>>>,
99 contacts: Vec<Arc<Contact>>,
100 incoming_contact_requests: Vec<Arc<User>>,
101 outgoing_contact_requests: Vec<Arc<User>>,
102 pending_contact_requests: HashMap<u64, usize>,
103 invite_info: Option<InviteInfo>,
104 client: Weak<Client>,
105 _maintain_contacts: Task<()>,
106 _maintain_current_user: Task<Result<()>>,
107 weak_self: WeakModel<Self>,
108}
109
110#[derive(Clone)]
111pub struct InviteInfo {
112 pub count: u32,
113 pub url: Arc<str>,
114}
115
116pub enum Event {
117 Contact {
118 user: Arc<User>,
119 kind: ContactEventKind,
120 },
121 ShowContacts,
122 ParticipantIndicesChanged,
123}
124
125#[derive(Clone, Copy)]
126pub enum ContactEventKind {
127 Requested,
128 Accepted,
129 Cancelled,
130}
131
132impl EventEmitter<Event> for UserStore {}
133
134enum UpdateContacts {
135 Update(proto::UpdateContacts),
136 Wait(postage::barrier::Sender),
137 Clear(postage::barrier::Sender),
138}
139
140impl UserStore {
141 pub fn new(client: Arc<Client>, cx: &ModelContext<Self>) -> Self {
142 let (mut current_user_tx, current_user_rx) = watch::channel();
143 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
144 let rpc_subscriptions = vec![
145 client.add_message_handler(cx.weak_model(), Self::handle_update_plan),
146 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
147 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
148 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
149 ];
150 Self {
151 users: Default::default(),
152 by_github_login: Default::default(),
153 current_user: current_user_rx,
154 current_plan: None,
155 accepted_tos_at: None,
156 contacts: Default::default(),
157 incoming_contact_requests: Default::default(),
158 participant_indices: Default::default(),
159 outgoing_contact_requests: Default::default(),
160 invite_info: None,
161 client: Arc::downgrade(&client),
162 update_contacts_tx,
163 _maintain_contacts: cx.spawn(|this, mut cx| async move {
164 let _subscriptions = rpc_subscriptions;
165 while let Some(message) = update_contacts_rx.next().await {
166 if let Ok(task) =
167 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
168 {
169 task.log_err().await;
170 } else {
171 break;
172 }
173 }
174 }),
175 _maintain_current_user: cx.spawn(|this, mut cx| async move {
176 let mut status = client.status();
177 let weak = Arc::downgrade(&client);
178 drop(client);
179 while let Some(status) = status.next().await {
180 // if the client is dropped, the app is shutting down.
181 let Some(client) = weak.upgrade() else {
182 return Ok(());
183 };
184 match status {
185 Status::Connected { .. } => {
186 if let Some(user_id) = client.user_id() {
187 let fetch_user = if let Ok(fetch_user) = this
188 .update(&mut cx, |this, cx| {
189 this.get_user(user_id, cx).log_err()
190 }) {
191 fetch_user
192 } else {
193 break;
194 };
195 let fetch_private_user_info =
196 client.request(proto::GetPrivateUserInfo {}).log_err();
197 let (user, info) =
198 futures::join!(fetch_user, fetch_private_user_info);
199
200 cx.update(|cx| {
201 if let Some(info) = info {
202 let disable_staff = std::env::var("ZED_DISABLE_STAFF")
203 .map_or(false, |v| !v.is_empty() && v != "0");
204 let staff = info.staff && !disable_staff;
205 cx.update_flags(staff, info.flags);
206 client.telemetry.set_authenticated_user_info(
207 Some(info.metrics_id.clone()),
208 staff,
209 );
210
211 this.update(cx, |this, _| {
212 this.set_current_user_accepted_tos_at(
213 info.accepted_tos_at,
214 );
215 })
216 } else {
217 anyhow::Ok(())
218 }
219 })??;
220
221 current_user_tx.send(user).await.ok();
222
223 this.update(&mut cx, |_, cx| cx.notify())?;
224 }
225 }
226 Status::SignedOut => {
227 current_user_tx.send(None).await.ok();
228 this.update(&mut cx, |this, cx| {
229 cx.notify();
230 this.clear_contacts()
231 })?
232 .await;
233 }
234 Status::ConnectionLost => {
235 this.update(&mut cx, |this, cx| {
236 cx.notify();
237 this.clear_contacts()
238 })?
239 .await;
240 }
241 _ => {}
242 }
243 }
244 Ok(())
245 }),
246 pending_contact_requests: Default::default(),
247 weak_self: cx.weak_model(),
248 }
249 }
250
251 #[cfg(feature = "test-support")]
252 pub fn clear_cache(&mut self) {
253 self.users.clear();
254 self.by_github_login.clear();
255 }
256
257 async fn handle_update_invite_info(
258 this: Model<Self>,
259 message: TypedEnvelope<proto::UpdateInviteInfo>,
260 mut cx: AsyncAppContext,
261 ) -> Result<()> {
262 this.update(&mut cx, |this, cx| {
263 this.invite_info = Some(InviteInfo {
264 url: Arc::from(message.payload.url),
265 count: message.payload.count,
266 });
267 cx.notify();
268 })?;
269 Ok(())
270 }
271
272 async fn handle_show_contacts(
273 this: Model<Self>,
274 _: TypedEnvelope<proto::ShowContacts>,
275 mut cx: AsyncAppContext,
276 ) -> Result<()> {
277 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
278 Ok(())
279 }
280
281 pub fn invite_info(&self) -> Option<&InviteInfo> {
282 self.invite_info.as_ref()
283 }
284
285 async fn handle_update_contacts(
286 this: Model<Self>,
287 message: TypedEnvelope<proto::UpdateContacts>,
288 mut cx: AsyncAppContext,
289 ) -> Result<()> {
290 this.update(&mut cx, |this, _| {
291 this.update_contacts_tx
292 .unbounded_send(UpdateContacts::Update(message.payload))
293 .unwrap();
294 })?;
295 Ok(())
296 }
297
298 async fn handle_update_plan(
299 this: Model<Self>,
300 message: TypedEnvelope<proto::UpdateUserPlan>,
301 mut cx: AsyncAppContext,
302 ) -> Result<()> {
303 this.update(&mut cx, |this, cx| {
304 this.current_plan = Some(message.payload.plan());
305 cx.notify();
306 })?;
307 Ok(())
308 }
309
310 fn update_contacts(
311 &mut self,
312 message: UpdateContacts,
313 cx: &ModelContext<Self>,
314 ) -> Task<Result<()>> {
315 match message {
316 UpdateContacts::Wait(barrier) => {
317 drop(barrier);
318 Task::ready(Ok(()))
319 }
320 UpdateContacts::Clear(barrier) => {
321 self.contacts.clear();
322 self.incoming_contact_requests.clear();
323 self.outgoing_contact_requests.clear();
324 drop(barrier);
325 Task::ready(Ok(()))
326 }
327 UpdateContacts::Update(message) => {
328 let mut user_ids = HashSet::default();
329 for contact in &message.contacts {
330 user_ids.insert(contact.user_id);
331 }
332 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
333 user_ids.extend(message.outgoing_requests.iter());
334
335 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
336 cx.spawn(|this, mut cx| async move {
337 load_users.await?;
338
339 // Users are fetched in parallel above and cached in call to get_users
340 // No need to parallelize here
341 let mut updated_contacts = Vec::new();
342 let this = this
343 .upgrade()
344 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
345 for contact in message.contacts {
346 updated_contacts.push(Arc::new(
347 Contact::from_proto(contact, &this, &mut cx).await?,
348 ));
349 }
350
351 let mut incoming_requests = Vec::new();
352 for request in message.incoming_requests {
353 incoming_requests.push({
354 this.update(&mut cx, |this, cx| {
355 this.get_user(request.requester_id, cx)
356 })?
357 .await?
358 });
359 }
360
361 let mut outgoing_requests = Vec::new();
362 for requested_user_id in message.outgoing_requests {
363 outgoing_requests.push(
364 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
365 .await?,
366 );
367 }
368
369 let removed_contacts =
370 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
371 let removed_incoming_requests =
372 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
373 let removed_outgoing_requests =
374 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
375
376 this.update(&mut cx, |this, cx| {
377 // Remove contacts
378 this.contacts
379 .retain(|contact| !removed_contacts.contains(&contact.user.id));
380 // Update existing contacts and insert new ones
381 for updated_contact in updated_contacts {
382 match this.contacts.binary_search_by_key(
383 &&updated_contact.user.github_login,
384 |contact| &contact.user.github_login,
385 ) {
386 Ok(ix) => this.contacts[ix] = updated_contact,
387 Err(ix) => this.contacts.insert(ix, updated_contact),
388 }
389 }
390
391 // Remove incoming contact requests
392 this.incoming_contact_requests.retain(|user| {
393 if removed_incoming_requests.contains(&user.id) {
394 cx.emit(Event::Contact {
395 user: user.clone(),
396 kind: ContactEventKind::Cancelled,
397 });
398 false
399 } else {
400 true
401 }
402 });
403 // Update existing incoming requests and insert new ones
404 for user in incoming_requests {
405 match this
406 .incoming_contact_requests
407 .binary_search_by_key(&&user.github_login, |contact| {
408 &contact.github_login
409 }) {
410 Ok(ix) => this.incoming_contact_requests[ix] = user,
411 Err(ix) => this.incoming_contact_requests.insert(ix, user),
412 }
413 }
414
415 // Remove outgoing contact requests
416 this.outgoing_contact_requests
417 .retain(|user| !removed_outgoing_requests.contains(&user.id));
418 // Update existing incoming requests and insert new ones
419 for request in outgoing_requests {
420 match this
421 .outgoing_contact_requests
422 .binary_search_by_key(&&request.github_login, |contact| {
423 &contact.github_login
424 }) {
425 Ok(ix) => this.outgoing_contact_requests[ix] = request,
426 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
427 }
428 }
429
430 cx.notify();
431 })?;
432
433 Ok(())
434 })
435 }
436 }
437 }
438
439 pub fn contacts(&self) -> &[Arc<Contact>] {
440 &self.contacts
441 }
442
443 pub fn has_contact(&self, user: &Arc<User>) -> bool {
444 self.contacts
445 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
446 .is_ok()
447 }
448
449 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
450 &self.incoming_contact_requests
451 }
452
453 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
454 &self.outgoing_contact_requests
455 }
456
457 pub fn is_contact_request_pending(&self, user: &User) -> bool {
458 self.pending_contact_requests.contains_key(&user.id)
459 }
460
461 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
462 if self
463 .contacts
464 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
465 .is_ok()
466 {
467 ContactRequestStatus::RequestAccepted
468 } else if self
469 .outgoing_contact_requests
470 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
471 .is_ok()
472 {
473 ContactRequestStatus::RequestSent
474 } else if self
475 .incoming_contact_requests
476 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
477 .is_ok()
478 {
479 ContactRequestStatus::RequestReceived
480 } else {
481 ContactRequestStatus::None
482 }
483 }
484
485 pub fn request_contact(
486 &mut self,
487 responder_id: u64,
488 cx: &mut ModelContext<Self>,
489 ) -> Task<Result<()>> {
490 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
491 }
492
493 pub fn remove_contact(
494 &mut self,
495 user_id: u64,
496 cx: &mut ModelContext<Self>,
497 ) -> Task<Result<()>> {
498 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
499 }
500
501 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
502 self.incoming_contact_requests
503 .iter()
504 .any(|user| user.id == user_id)
505 }
506
507 pub fn respond_to_contact_request(
508 &mut self,
509 requester_id: u64,
510 accept: bool,
511 cx: &mut ModelContext<Self>,
512 ) -> Task<Result<()>> {
513 self.perform_contact_request(
514 requester_id,
515 proto::RespondToContactRequest {
516 requester_id,
517 response: if accept {
518 proto::ContactRequestResponse::Accept
519 } else {
520 proto::ContactRequestResponse::Decline
521 } as i32,
522 },
523 cx,
524 )
525 }
526
527 pub fn dismiss_contact_request(
528 &self,
529 requester_id: u64,
530 cx: &ModelContext<Self>,
531 ) -> Task<Result<()>> {
532 let client = self.client.upgrade();
533 cx.spawn(move |_, _| async move {
534 client
535 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
536 .request(proto::RespondToContactRequest {
537 requester_id,
538 response: proto::ContactRequestResponse::Dismiss as i32,
539 })
540 .await?;
541 Ok(())
542 })
543 }
544
545 fn perform_contact_request<T: RequestMessage>(
546 &mut self,
547 user_id: u64,
548 request: T,
549 cx: &mut ModelContext<Self>,
550 ) -> Task<Result<()>> {
551 let client = self.client.upgrade();
552 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
553 cx.notify();
554
555 cx.spawn(move |this, mut cx| async move {
556 let response = client
557 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
558 .request(request)
559 .await;
560 this.update(&mut cx, |this, cx| {
561 if let Entry::Occupied(mut request_count) =
562 this.pending_contact_requests.entry(user_id)
563 {
564 *request_count.get_mut() -= 1;
565 if *request_count.get() == 0 {
566 request_count.remove();
567 }
568 }
569 cx.notify();
570 })?;
571 response?;
572 Ok(())
573 })
574 }
575
576 pub fn clear_contacts(&self) -> impl Future<Output = ()> {
577 let (tx, mut rx) = postage::barrier::channel();
578 self.update_contacts_tx
579 .unbounded_send(UpdateContacts::Clear(tx))
580 .unwrap();
581 async move {
582 rx.next().await;
583 }
584 }
585
586 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
587 let (tx, mut rx) = postage::barrier::channel();
588 self.update_contacts_tx
589 .unbounded_send(UpdateContacts::Wait(tx))
590 .unwrap();
591 async move {
592 rx.next().await;
593 }
594 }
595
596 pub fn get_users(
597 &self,
598 user_ids: Vec<u64>,
599 cx: &ModelContext<Self>,
600 ) -> Task<Result<Vec<Arc<User>>>> {
601 let mut user_ids_to_fetch = user_ids.clone();
602 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
603
604 cx.spawn(|this, mut cx| async move {
605 if !user_ids_to_fetch.is_empty() {
606 this.update(&mut cx, |this, cx| {
607 this.load_users(
608 proto::GetUsers {
609 user_ids: user_ids_to_fetch,
610 },
611 cx,
612 )
613 })?
614 .await?;
615 }
616
617 this.update(&mut cx, |this, _| {
618 user_ids
619 .iter()
620 .map(|user_id| {
621 this.users
622 .get(user_id)
623 .cloned()
624 .ok_or_else(|| anyhow!("user {} not found", user_id))
625 })
626 .collect()
627 })?
628 })
629 }
630
631 pub fn fuzzy_search_users(
632 &self,
633 query: String,
634 cx: &ModelContext<Self>,
635 ) -> Task<Result<Vec<Arc<User>>>> {
636 self.load_users(proto::FuzzySearchUsers { query }, cx)
637 }
638
639 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
640 self.users.get(&user_id).cloned()
641 }
642
643 pub fn get_user_optimistic(&self, user_id: u64, cx: &ModelContext<Self>) -> Option<Arc<User>> {
644 if let Some(user) = self.users.get(&user_id).cloned() {
645 return Some(user);
646 }
647
648 self.get_user(user_id, cx).detach_and_log_err(cx);
649 None
650 }
651
652 pub fn get_user(&self, user_id: u64, cx: &ModelContext<Self>) -> Task<Result<Arc<User>>> {
653 if let Some(user) = self.users.get(&user_id).cloned() {
654 return Task::ready(Ok(user));
655 }
656
657 let load_users = self.get_users(vec![user_id], cx);
658 cx.spawn(move |this, mut cx| async move {
659 load_users.await?;
660 this.update(&mut cx, |this, _| {
661 this.users
662 .get(&user_id)
663 .cloned()
664 .ok_or_else(|| anyhow!("server responded with no users"))
665 })?
666 })
667 }
668
669 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
670 self.by_github_login
671 .get(github_login)
672 .and_then(|id| self.users.get(id).cloned())
673 }
674
675 pub fn current_user(&self) -> Option<Arc<User>> {
676 self.current_user.borrow().clone()
677 }
678
679 pub fn current_plan(&self) -> Option<proto::Plan> {
680 self.current_plan
681 }
682
683 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
684 self.current_user.clone()
685 }
686
687 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
688 self.accepted_tos_at
689 .map(|accepted_tos_at| accepted_tos_at.is_some())
690 }
691
692 pub fn accept_terms_of_service(&self, cx: &ModelContext<Self>) -> Task<Result<()>> {
693 if self.current_user().is_none() {
694 return Task::ready(Err(anyhow!("no current user")));
695 };
696
697 let client = self.client.clone();
698 cx.spawn(move |this, mut cx| async move {
699 if let Some(client) = client.upgrade() {
700 let response = client
701 .request(proto::AcceptTermsOfService {})
702 .await
703 .context("error accepting tos")?;
704
705 this.update(&mut cx, |this, _| {
706 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at))
707 })
708 } else {
709 Err(anyhow!("client not found"))
710 }
711 })
712 }
713
714 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
715 self.accepted_tos_at = Some(
716 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
717 );
718 }
719
720 fn load_users(
721 &self,
722 request: impl RequestMessage<Response = UsersResponse>,
723 cx: &ModelContext<Self>,
724 ) -> Task<Result<Vec<Arc<User>>>> {
725 let client = self.client.clone();
726 cx.spawn(|this, mut cx| async move {
727 if let Some(rpc) = client.upgrade() {
728 let response = rpc.request(request).await.context("error loading users")?;
729 let users = response.users;
730
731 this.update(&mut cx, |this, _| this.insert(users))
732 } else {
733 Ok(Vec::new())
734 }
735 })
736 }
737
738 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
739 let mut ret = Vec::with_capacity(users.len());
740 for user in users {
741 let user = User::new(user);
742 if let Some(old) = self.users.insert(user.id, user.clone()) {
743 if old.github_login != user.github_login {
744 self.by_github_login.remove(&old.github_login);
745 }
746 }
747 self.by_github_login
748 .insert(user.github_login.clone(), user.id);
749 ret.push(user)
750 }
751 ret
752 }
753
754 pub fn set_participant_indices(
755 &mut self,
756 participant_indices: HashMap<u64, ParticipantIndex>,
757 cx: &mut ModelContext<Self>,
758 ) {
759 if participant_indices != self.participant_indices {
760 self.participant_indices = participant_indices;
761 cx.emit(Event::ParticipantIndicesChanged);
762 }
763 }
764
765 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
766 &self.participant_indices
767 }
768
769 pub fn participant_names(
770 &self,
771 user_ids: impl Iterator<Item = u64>,
772 cx: &AppContext,
773 ) -> HashMap<u64, SharedString> {
774 let mut ret = HashMap::default();
775 let mut missing_user_ids = Vec::new();
776 for id in user_ids {
777 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
778 ret.insert(id, github_login.into());
779 } else {
780 missing_user_ids.push(id)
781 }
782 }
783 if !missing_user_ids.is_empty() {
784 let this = self.weak_self.clone();
785 cx.spawn(|mut cx| async move {
786 this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
787 .await
788 })
789 .detach_and_log_err(cx);
790 }
791 ret
792 }
793}
794
795impl User {
796 fn new(message: proto::User) -> Arc<Self> {
797 Arc::new(User {
798 id: message.id,
799 github_login: message.github_login,
800 avatar_uri: message.avatar_url.into(),
801 })
802 }
803}
804
805impl Contact {
806 async fn from_proto(
807 contact: proto::Contact,
808 user_store: &Model<UserStore>,
809 cx: &mut AsyncAppContext,
810 ) -> Result<Self> {
811 let user = user_store
812 .update(cx, |user_store, cx| {
813 user_store.get_user(contact.user_id, cx)
814 })?
815 .await?;
816 Ok(Self {
817 user,
818 online: contact.online,
819 busy: contact.busy,
820 })
821 }
822}
823
824impl Collaborator {
825 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
826 Ok(Self {
827 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
828 replica_id: message.replica_id as ReplicaId,
829 user_id: message.user_id as UserId,
830 is_host: message.is_host,
831 })
832 }
833}