1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use chrono::{DateTime, Utc};
4use collections::{hash_map::Entry, HashMap, HashSet};
5use feature_flags::FeatureFlagAppExt;
6use futures::{channel::mpsc, Future, StreamExt};
7use gpui::{
8 AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
9 WeakModel,
10};
11use postage::{sink::Sink, watch};
12use rpc::proto::{RequestMessage, UsersResponse};
13use std::sync::{Arc, Weak};
14use text::ReplicaId;
15use util::TryFutureExt as _;
16
17pub type UserId = u64;
18
19#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
20pub struct ChannelId(pub u64);
21
22impl std::fmt::Display for ChannelId {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 self.0.fmt(f)
25 }
26}
27
28#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
29pub struct ProjectId(pub u64);
30
31#[derive(
32 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
33)]
34pub struct DevServerProjectId(pub u64);
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct ParticipantIndex(pub u32);
38
39#[derive(Default, Debug)]
40pub struct User {
41 pub id: UserId,
42 pub github_login: String,
43 pub avatar_uri: SharedUri,
44}
45
46#[derive(Clone, Debug, PartialEq, Eq)]
47pub struct Collaborator {
48 pub peer_id: proto::PeerId,
49 pub replica_id: ReplicaId,
50 pub user_id: UserId,
51}
52
53impl PartialOrd for User {
54 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
55 Some(self.cmp(other))
56 }
57}
58
59impl Ord for User {
60 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
61 self.github_login.cmp(&other.github_login)
62 }
63}
64
65impl PartialEq for User {
66 fn eq(&self, other: &Self) -> bool {
67 self.id == other.id && self.github_login == other.github_login
68 }
69}
70
71impl Eq for User {}
72
73#[derive(Debug, PartialEq)]
74pub struct Contact {
75 pub user: Arc<User>,
76 pub online: bool,
77 pub busy: bool,
78}
79
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum ContactRequestStatus {
82 None,
83 RequestSent,
84 RequestReceived,
85 RequestAccepted,
86}
87
88pub struct UserStore {
89 users: HashMap<u64, Arc<User>>,
90 by_github_login: HashMap<String, u64>,
91 participant_indices: HashMap<u64, ParticipantIndex>,
92 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
93 current_plan: Option<proto::Plan>,
94 current_user: watch::Receiver<Option<Arc<User>>>,
95 accepted_tos_at: Option<Option<DateTime<Utc>>>,
96 contacts: Vec<Arc<Contact>>,
97 incoming_contact_requests: Vec<Arc<User>>,
98 outgoing_contact_requests: Vec<Arc<User>>,
99 pending_contact_requests: HashMap<u64, usize>,
100 invite_info: Option<InviteInfo>,
101 client: Weak<Client>,
102 _maintain_contacts: Task<()>,
103 _maintain_current_user: Task<Result<()>>,
104 weak_self: WeakModel<Self>,
105}
106
107#[derive(Clone)]
108pub struct InviteInfo {
109 pub count: u32,
110 pub url: Arc<str>,
111}
112
113pub enum Event {
114 Contact {
115 user: Arc<User>,
116 kind: ContactEventKind,
117 },
118 ShowContacts,
119 ParticipantIndicesChanged,
120}
121
122#[derive(Clone, Copy)]
123pub enum ContactEventKind {
124 Requested,
125 Accepted,
126 Cancelled,
127}
128
129impl EventEmitter<Event> for UserStore {}
130
131enum UpdateContacts {
132 Update(proto::UpdateContacts),
133 Wait(postage::barrier::Sender),
134 Clear(postage::barrier::Sender),
135}
136
137impl UserStore {
138 pub fn new(client: Arc<Client>, cx: &ModelContext<Self>) -> Self {
139 let (mut current_user_tx, current_user_rx) = watch::channel();
140 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
141 let rpc_subscriptions = vec![
142 client.add_message_handler(cx.weak_model(), Self::handle_update_plan),
143 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
144 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
145 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
146 ];
147 Self {
148 users: Default::default(),
149 by_github_login: Default::default(),
150 current_user: current_user_rx,
151 current_plan: None,
152 accepted_tos_at: None,
153 contacts: Default::default(),
154 incoming_contact_requests: Default::default(),
155 participant_indices: Default::default(),
156 outgoing_contact_requests: Default::default(),
157 invite_info: None,
158 client: Arc::downgrade(&client),
159 update_contacts_tx,
160 _maintain_contacts: cx.spawn(|this, mut cx| async move {
161 let _subscriptions = rpc_subscriptions;
162 while let Some(message) = update_contacts_rx.next().await {
163 if let Ok(task) =
164 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
165 {
166 task.log_err().await;
167 } else {
168 break;
169 }
170 }
171 }),
172 _maintain_current_user: cx.spawn(|this, mut cx| async move {
173 let mut status = client.status();
174 let weak = Arc::downgrade(&client);
175 drop(client);
176 while let Some(status) = status.next().await {
177 // if the client is dropped, the app is shutting down.
178 let Some(client) = weak.upgrade() else {
179 return Ok(());
180 };
181 match status {
182 Status::Connected { .. } => {
183 if let Some(user_id) = client.user_id() {
184 let fetch_user = if let Ok(fetch_user) = this
185 .update(&mut cx, |this, cx| {
186 this.get_user(user_id, cx).log_err()
187 }) {
188 fetch_user
189 } else {
190 break;
191 };
192 let fetch_private_user_info =
193 client.request(proto::GetPrivateUserInfo {}).log_err();
194 let (user, info) =
195 futures::join!(fetch_user, fetch_private_user_info);
196
197 cx.update(|cx| {
198 if let Some(info) = info {
199 let disable_staff = std::env::var("ZED_DISABLE_STAFF")
200 .map_or(false, |v| !v.is_empty() && v != "0");
201 let staff = info.staff && !disable_staff;
202 cx.update_flags(staff, info.flags);
203 client.telemetry.set_authenticated_user_info(
204 Some(info.metrics_id.clone()),
205 staff,
206 );
207
208 this.update(cx, |this, _| {
209 this.set_current_user_accepted_tos_at(
210 info.accepted_tos_at,
211 );
212 })
213 } else {
214 anyhow::Ok(())
215 }
216 })??;
217
218 current_user_tx.send(user).await.ok();
219
220 this.update(&mut cx, |_, cx| cx.notify())?;
221 }
222 }
223 Status::SignedOut => {
224 current_user_tx.send(None).await.ok();
225 this.update(&mut cx, |this, cx| {
226 cx.notify();
227 this.clear_contacts()
228 })?
229 .await;
230 }
231 Status::ConnectionLost => {
232 this.update(&mut cx, |this, cx| {
233 cx.notify();
234 this.clear_contacts()
235 })?
236 .await;
237 }
238 _ => {}
239 }
240 }
241 Ok(())
242 }),
243 pending_contact_requests: Default::default(),
244 weak_self: cx.weak_model(),
245 }
246 }
247
248 #[cfg(feature = "test-support")]
249 pub fn clear_cache(&mut self) {
250 self.users.clear();
251 self.by_github_login.clear();
252 }
253
254 async fn handle_update_invite_info(
255 this: Model<Self>,
256 message: TypedEnvelope<proto::UpdateInviteInfo>,
257 mut cx: AsyncAppContext,
258 ) -> Result<()> {
259 this.update(&mut cx, |this, cx| {
260 this.invite_info = Some(InviteInfo {
261 url: Arc::from(message.payload.url),
262 count: message.payload.count,
263 });
264 cx.notify();
265 })?;
266 Ok(())
267 }
268
269 async fn handle_show_contacts(
270 this: Model<Self>,
271 _: TypedEnvelope<proto::ShowContacts>,
272 mut cx: AsyncAppContext,
273 ) -> Result<()> {
274 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
275 Ok(())
276 }
277
278 pub fn invite_info(&self) -> Option<&InviteInfo> {
279 self.invite_info.as_ref()
280 }
281
282 async fn handle_update_contacts(
283 this: Model<Self>,
284 message: TypedEnvelope<proto::UpdateContacts>,
285 mut cx: AsyncAppContext,
286 ) -> Result<()> {
287 this.update(&mut cx, |this, _| {
288 this.update_contacts_tx
289 .unbounded_send(UpdateContacts::Update(message.payload))
290 .unwrap();
291 })?;
292 Ok(())
293 }
294
295 async fn handle_update_plan(
296 this: Model<Self>,
297 message: TypedEnvelope<proto::UpdateUserPlan>,
298 mut cx: AsyncAppContext,
299 ) -> Result<()> {
300 this.update(&mut cx, |this, cx| {
301 this.current_plan = Some(message.payload.plan());
302 cx.notify();
303 })?;
304 Ok(())
305 }
306
307 fn update_contacts(
308 &mut self,
309 message: UpdateContacts,
310 cx: &ModelContext<Self>,
311 ) -> Task<Result<()>> {
312 match message {
313 UpdateContacts::Wait(barrier) => {
314 drop(barrier);
315 Task::ready(Ok(()))
316 }
317 UpdateContacts::Clear(barrier) => {
318 self.contacts.clear();
319 self.incoming_contact_requests.clear();
320 self.outgoing_contact_requests.clear();
321 drop(barrier);
322 Task::ready(Ok(()))
323 }
324 UpdateContacts::Update(message) => {
325 let mut user_ids = HashSet::default();
326 for contact in &message.contacts {
327 user_ids.insert(contact.user_id);
328 }
329 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
330 user_ids.extend(message.outgoing_requests.iter());
331
332 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
333 cx.spawn(|this, mut cx| async move {
334 load_users.await?;
335
336 // Users are fetched in parallel above and cached in call to get_users
337 // No need to parallelize here
338 let mut updated_contacts = Vec::new();
339 let this = this
340 .upgrade()
341 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
342 for contact in message.contacts {
343 updated_contacts.push(Arc::new(
344 Contact::from_proto(contact, &this, &mut cx).await?,
345 ));
346 }
347
348 let mut incoming_requests = Vec::new();
349 for request in message.incoming_requests {
350 incoming_requests.push({
351 this.update(&mut cx, |this, cx| {
352 this.get_user(request.requester_id, cx)
353 })?
354 .await?
355 });
356 }
357
358 let mut outgoing_requests = Vec::new();
359 for requested_user_id in message.outgoing_requests {
360 outgoing_requests.push(
361 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
362 .await?,
363 );
364 }
365
366 let removed_contacts =
367 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
368 let removed_incoming_requests =
369 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
370 let removed_outgoing_requests =
371 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
372
373 this.update(&mut cx, |this, cx| {
374 // Remove contacts
375 this.contacts
376 .retain(|contact| !removed_contacts.contains(&contact.user.id));
377 // Update existing contacts and insert new ones
378 for updated_contact in updated_contacts {
379 match this.contacts.binary_search_by_key(
380 &&updated_contact.user.github_login,
381 |contact| &contact.user.github_login,
382 ) {
383 Ok(ix) => this.contacts[ix] = updated_contact,
384 Err(ix) => this.contacts.insert(ix, updated_contact),
385 }
386 }
387
388 // Remove incoming contact requests
389 this.incoming_contact_requests.retain(|user| {
390 if removed_incoming_requests.contains(&user.id) {
391 cx.emit(Event::Contact {
392 user: user.clone(),
393 kind: ContactEventKind::Cancelled,
394 });
395 false
396 } else {
397 true
398 }
399 });
400 // Update existing incoming requests and insert new ones
401 for user in incoming_requests {
402 match this
403 .incoming_contact_requests
404 .binary_search_by_key(&&user.github_login, |contact| {
405 &contact.github_login
406 }) {
407 Ok(ix) => this.incoming_contact_requests[ix] = user,
408 Err(ix) => this.incoming_contact_requests.insert(ix, user),
409 }
410 }
411
412 // Remove outgoing contact requests
413 this.outgoing_contact_requests
414 .retain(|user| !removed_outgoing_requests.contains(&user.id));
415 // Update existing incoming requests and insert new ones
416 for request in outgoing_requests {
417 match this
418 .outgoing_contact_requests
419 .binary_search_by_key(&&request.github_login, |contact| {
420 &contact.github_login
421 }) {
422 Ok(ix) => this.outgoing_contact_requests[ix] = request,
423 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
424 }
425 }
426
427 cx.notify();
428 })?;
429
430 Ok(())
431 })
432 }
433 }
434 }
435
436 pub fn contacts(&self) -> &[Arc<Contact>] {
437 &self.contacts
438 }
439
440 pub fn has_contact(&self, user: &Arc<User>) -> bool {
441 self.contacts
442 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
443 .is_ok()
444 }
445
446 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
447 &self.incoming_contact_requests
448 }
449
450 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
451 &self.outgoing_contact_requests
452 }
453
454 pub fn is_contact_request_pending(&self, user: &User) -> bool {
455 self.pending_contact_requests.contains_key(&user.id)
456 }
457
458 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
459 if self
460 .contacts
461 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
462 .is_ok()
463 {
464 ContactRequestStatus::RequestAccepted
465 } else if self
466 .outgoing_contact_requests
467 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
468 .is_ok()
469 {
470 ContactRequestStatus::RequestSent
471 } else if self
472 .incoming_contact_requests
473 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
474 .is_ok()
475 {
476 ContactRequestStatus::RequestReceived
477 } else {
478 ContactRequestStatus::None
479 }
480 }
481
482 pub fn request_contact(
483 &mut self,
484 responder_id: u64,
485 cx: &mut ModelContext<Self>,
486 ) -> Task<Result<()>> {
487 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
488 }
489
490 pub fn remove_contact(
491 &mut self,
492 user_id: u64,
493 cx: &mut ModelContext<Self>,
494 ) -> Task<Result<()>> {
495 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
496 }
497
498 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
499 self.incoming_contact_requests
500 .iter()
501 .any(|user| user.id == user_id)
502 }
503
504 pub fn respond_to_contact_request(
505 &mut self,
506 requester_id: u64,
507 accept: bool,
508 cx: &mut ModelContext<Self>,
509 ) -> Task<Result<()>> {
510 self.perform_contact_request(
511 requester_id,
512 proto::RespondToContactRequest {
513 requester_id,
514 response: if accept {
515 proto::ContactRequestResponse::Accept
516 } else {
517 proto::ContactRequestResponse::Decline
518 } as i32,
519 },
520 cx,
521 )
522 }
523
524 pub fn dismiss_contact_request(
525 &self,
526 requester_id: u64,
527 cx: &ModelContext<Self>,
528 ) -> Task<Result<()>> {
529 let client = self.client.upgrade();
530 cx.spawn(move |_, _| async move {
531 client
532 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
533 .request(proto::RespondToContactRequest {
534 requester_id,
535 response: proto::ContactRequestResponse::Dismiss as i32,
536 })
537 .await?;
538 Ok(())
539 })
540 }
541
542 fn perform_contact_request<T: RequestMessage>(
543 &mut self,
544 user_id: u64,
545 request: T,
546 cx: &mut ModelContext<Self>,
547 ) -> Task<Result<()>> {
548 let client = self.client.upgrade();
549 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
550 cx.notify();
551
552 cx.spawn(move |this, mut cx| async move {
553 let response = client
554 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
555 .request(request)
556 .await;
557 this.update(&mut cx, |this, cx| {
558 if let Entry::Occupied(mut request_count) =
559 this.pending_contact_requests.entry(user_id)
560 {
561 *request_count.get_mut() -= 1;
562 if *request_count.get() == 0 {
563 request_count.remove();
564 }
565 }
566 cx.notify();
567 })?;
568 response?;
569 Ok(())
570 })
571 }
572
573 pub fn clear_contacts(&self) -> impl Future<Output = ()> {
574 let (tx, mut rx) = postage::barrier::channel();
575 self.update_contacts_tx
576 .unbounded_send(UpdateContacts::Clear(tx))
577 .unwrap();
578 async move {
579 rx.next().await;
580 }
581 }
582
583 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
584 let (tx, mut rx) = postage::barrier::channel();
585 self.update_contacts_tx
586 .unbounded_send(UpdateContacts::Wait(tx))
587 .unwrap();
588 async move {
589 rx.next().await;
590 }
591 }
592
593 pub fn get_users(
594 &self,
595 user_ids: Vec<u64>,
596 cx: &ModelContext<Self>,
597 ) -> Task<Result<Vec<Arc<User>>>> {
598 let mut user_ids_to_fetch = user_ids.clone();
599 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
600
601 cx.spawn(|this, mut cx| async move {
602 if !user_ids_to_fetch.is_empty() {
603 this.update(&mut cx, |this, cx| {
604 this.load_users(
605 proto::GetUsers {
606 user_ids: user_ids_to_fetch,
607 },
608 cx,
609 )
610 })?
611 .await?;
612 }
613
614 this.update(&mut cx, |this, _| {
615 user_ids
616 .iter()
617 .map(|user_id| {
618 this.users
619 .get(user_id)
620 .cloned()
621 .ok_or_else(|| anyhow!("user {} not found", user_id))
622 })
623 .collect()
624 })?
625 })
626 }
627
628 pub fn fuzzy_search_users(
629 &self,
630 query: String,
631 cx: &ModelContext<Self>,
632 ) -> Task<Result<Vec<Arc<User>>>> {
633 self.load_users(proto::FuzzySearchUsers { query }, cx)
634 }
635
636 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
637 self.users.get(&user_id).cloned()
638 }
639
640 pub fn get_user_optimistic(&self, user_id: u64, cx: &ModelContext<Self>) -> Option<Arc<User>> {
641 if let Some(user) = self.users.get(&user_id).cloned() {
642 return Some(user);
643 }
644
645 self.get_user(user_id, cx).detach_and_log_err(cx);
646 None
647 }
648
649 pub fn get_user(&self, user_id: u64, cx: &ModelContext<Self>) -> Task<Result<Arc<User>>> {
650 if let Some(user) = self.users.get(&user_id).cloned() {
651 return Task::ready(Ok(user));
652 }
653
654 let load_users = self.get_users(vec![user_id], cx);
655 cx.spawn(move |this, mut cx| async move {
656 load_users.await?;
657 this.update(&mut cx, |this, _| {
658 this.users
659 .get(&user_id)
660 .cloned()
661 .ok_or_else(|| anyhow!("server responded with no users"))
662 })?
663 })
664 }
665
666 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
667 self.by_github_login
668 .get(github_login)
669 .and_then(|id| self.users.get(id).cloned())
670 }
671
672 pub fn current_user(&self) -> Option<Arc<User>> {
673 self.current_user.borrow().clone()
674 }
675
676 pub fn current_plan(&self) -> Option<proto::Plan> {
677 self.current_plan
678 }
679
680 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
681 self.current_user.clone()
682 }
683
684 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
685 self.accepted_tos_at
686 .map(|accepted_tos_at| accepted_tos_at.is_some())
687 }
688
689 pub fn accept_terms_of_service(&self, cx: &ModelContext<Self>) -> Task<Result<()>> {
690 if self.current_user().is_none() {
691 return Task::ready(Err(anyhow!("no current user")));
692 };
693
694 let client = self.client.clone();
695 cx.spawn(move |this, mut cx| async move {
696 if let Some(client) = client.upgrade() {
697 let response = client
698 .request(proto::AcceptTermsOfService {})
699 .await
700 .context("error accepting tos")?;
701
702 this.update(&mut cx, |this, _| {
703 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at))
704 })
705 } else {
706 Err(anyhow!("client not found"))
707 }
708 })
709 }
710
711 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
712 self.accepted_tos_at = Some(
713 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
714 );
715 }
716
717 fn load_users(
718 &self,
719 request: impl RequestMessage<Response = UsersResponse>,
720 cx: &ModelContext<Self>,
721 ) -> Task<Result<Vec<Arc<User>>>> {
722 let client = self.client.clone();
723 cx.spawn(|this, mut cx| async move {
724 if let Some(rpc) = client.upgrade() {
725 let response = rpc.request(request).await.context("error loading users")?;
726 let users = response.users;
727
728 this.update(&mut cx, |this, _| this.insert(users))
729 } else {
730 Ok(Vec::new())
731 }
732 })
733 }
734
735 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
736 let mut ret = Vec::with_capacity(users.len());
737 for user in users {
738 let user = User::new(user);
739 if let Some(old) = self.users.insert(user.id, user.clone()) {
740 if old.github_login != user.github_login {
741 self.by_github_login.remove(&old.github_login);
742 }
743 }
744 self.by_github_login
745 .insert(user.github_login.clone(), user.id);
746 ret.push(user)
747 }
748 ret
749 }
750
751 pub fn set_participant_indices(
752 &mut self,
753 participant_indices: HashMap<u64, ParticipantIndex>,
754 cx: &mut ModelContext<Self>,
755 ) {
756 if participant_indices != self.participant_indices {
757 self.participant_indices = participant_indices;
758 cx.emit(Event::ParticipantIndicesChanged);
759 }
760 }
761
762 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
763 &self.participant_indices
764 }
765
766 pub fn participant_names(
767 &self,
768 user_ids: impl Iterator<Item = u64>,
769 cx: &AppContext,
770 ) -> HashMap<u64, SharedString> {
771 let mut ret = HashMap::default();
772 let mut missing_user_ids = Vec::new();
773 for id in user_ids {
774 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
775 ret.insert(id, github_login.into());
776 } else {
777 missing_user_ids.push(id)
778 }
779 }
780 if !missing_user_ids.is_empty() {
781 let this = self.weak_self.clone();
782 cx.spawn(|mut cx| async move {
783 this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
784 .await
785 })
786 .detach_and_log_err(cx);
787 }
788 ret
789 }
790}
791
792impl User {
793 fn new(message: proto::User) -> Arc<Self> {
794 Arc::new(User {
795 id: message.id,
796 github_login: message.github_login,
797 avatar_uri: message.avatar_url.into(),
798 })
799 }
800}
801
802impl Contact {
803 async fn from_proto(
804 contact: proto::Contact,
805 user_store: &Model<UserStore>,
806 cx: &mut AsyncAppContext,
807 ) -> Result<Self> {
808 let user = user_store
809 .update(cx, |user_store, cx| {
810 user_store.get_user(contact.user_id, cx)
811 })?
812 .await?;
813 Ok(Self {
814 user,
815 online: contact.online,
816 busy: contact.busy,
817 })
818 }
819}
820
821impl Collaborator {
822 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
823 Ok(Self {
824 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
825 replica_id: message.replica_id as ReplicaId,
826 user_id: message.user_id as UserId,
827 })
828 }
829}