1use super::{proto, Client, Status, TypedEnvelope};
2use anyhow::{anyhow, Context, Result};
3use chrono::{DateTime, Utc};
4use collections::{hash_map::Entry, HashMap, HashSet};
5use feature_flags::FeatureFlagAppExt;
6use futures::{channel::mpsc, Future, StreamExt};
7use gpui::{
8 AppContext, AsyncAppContext, EventEmitter, Model, ModelContext, SharedString, SharedUri, Task,
9 WeakModel,
10};
11use postage::{sink::Sink, watch};
12use rpc::proto::{RequestMessage, UsersResponse};
13use std::sync::{Arc, Weak};
14use text::ReplicaId;
15use util::TryFutureExt as _;
16
17pub type UserId = u64;
18
19#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
20pub struct ChannelId(pub u64);
21
22impl std::fmt::Display for ChannelId {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 self.0.fmt(f)
25 }
26}
27
28#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
29pub struct ProjectId(pub u64);
30
31#[derive(
32 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
33)]
34pub struct DevServerProjectId(pub u64);
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub struct ParticipantIndex(pub u32);
38
39#[derive(Default, Debug)]
40pub struct User {
41 pub id: UserId,
42 pub github_login: String,
43 pub avatar_uri: SharedUri,
44}
45
46#[derive(Clone, Debug, PartialEq, Eq)]
47pub struct Collaborator {
48 pub peer_id: proto::PeerId,
49 pub replica_id: ReplicaId,
50 pub user_id: UserId,
51 pub is_host: bool,
52}
53
54impl PartialOrd for User {
55 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
56 Some(self.cmp(other))
57 }
58}
59
60impl Ord for User {
61 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
62 self.github_login.cmp(&other.github_login)
63 }
64}
65
66impl PartialEq for User {
67 fn eq(&self, other: &Self) -> bool {
68 self.id == other.id && self.github_login == other.github_login
69 }
70}
71
72impl Eq for User {}
73
74#[derive(Debug, PartialEq)]
75pub struct Contact {
76 pub user: Arc<User>,
77 pub online: bool,
78 pub busy: bool,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum ContactRequestStatus {
83 None,
84 RequestSent,
85 RequestReceived,
86 RequestAccepted,
87}
88
89pub struct UserStore {
90 users: HashMap<u64, Arc<User>>,
91 by_github_login: HashMap<String, u64>,
92 participant_indices: HashMap<u64, ParticipantIndex>,
93 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
94 current_plan: Option<proto::Plan>,
95 current_user: watch::Receiver<Option<Arc<User>>>,
96 accepted_tos_at: Option<Option<DateTime<Utc>>>,
97 contacts: Vec<Arc<Contact>>,
98 incoming_contact_requests: Vec<Arc<User>>,
99 outgoing_contact_requests: Vec<Arc<User>>,
100 pending_contact_requests: HashMap<u64, usize>,
101 invite_info: Option<InviteInfo>,
102 client: Weak<Client>,
103 _maintain_contacts: Task<()>,
104 _maintain_current_user: Task<Result<()>>,
105 weak_self: WeakModel<Self>,
106}
107
108#[derive(Clone)]
109pub struct InviteInfo {
110 pub count: u32,
111 pub url: Arc<str>,
112}
113
114pub enum Event {
115 Contact {
116 user: Arc<User>,
117 kind: ContactEventKind,
118 },
119 ShowContacts,
120 ParticipantIndicesChanged,
121}
122
123#[derive(Clone, Copy)]
124pub enum ContactEventKind {
125 Requested,
126 Accepted,
127 Cancelled,
128}
129
130impl EventEmitter<Event> for UserStore {}
131
132enum UpdateContacts {
133 Update(proto::UpdateContacts),
134 Wait(postage::barrier::Sender),
135 Clear(postage::barrier::Sender),
136}
137
138impl UserStore {
139 pub fn new(client: Arc<Client>, cx: &ModelContext<Self>) -> Self {
140 let (mut current_user_tx, current_user_rx) = watch::channel();
141 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
142 let rpc_subscriptions = vec![
143 client.add_message_handler(cx.weak_model(), Self::handle_update_plan),
144 client.add_message_handler(cx.weak_model(), Self::handle_update_contacts),
145 client.add_message_handler(cx.weak_model(), Self::handle_update_invite_info),
146 client.add_message_handler(cx.weak_model(), Self::handle_show_contacts),
147 ];
148 Self {
149 users: Default::default(),
150 by_github_login: Default::default(),
151 current_user: current_user_rx,
152 current_plan: None,
153 accepted_tos_at: None,
154 contacts: Default::default(),
155 incoming_contact_requests: Default::default(),
156 participant_indices: Default::default(),
157 outgoing_contact_requests: Default::default(),
158 invite_info: None,
159 client: Arc::downgrade(&client),
160 update_contacts_tx,
161 _maintain_contacts: cx.spawn(|this, mut cx| async move {
162 let _subscriptions = rpc_subscriptions;
163 while let Some(message) = update_contacts_rx.next().await {
164 if let Ok(task) =
165 this.update(&mut cx, |this, cx| this.update_contacts(message, cx))
166 {
167 task.log_err().await;
168 } else {
169 break;
170 }
171 }
172 }),
173 _maintain_current_user: cx.spawn(|this, mut cx| async move {
174 let mut status = client.status();
175 let weak = Arc::downgrade(&client);
176 drop(client);
177 while let Some(status) = status.next().await {
178 // if the client is dropped, the app is shutting down.
179 let Some(client) = weak.upgrade() else {
180 return Ok(());
181 };
182 match status {
183 Status::Connected { .. } => {
184 if let Some(user_id) = client.user_id() {
185 let fetch_user = if let Ok(fetch_user) = this
186 .update(&mut cx, |this, cx| {
187 this.get_user(user_id, cx).log_err()
188 }) {
189 fetch_user
190 } else {
191 break;
192 };
193 let fetch_private_user_info =
194 client.request(proto::GetPrivateUserInfo {}).log_err();
195 let (user, info) =
196 futures::join!(fetch_user, fetch_private_user_info);
197
198 cx.update(|cx| {
199 if let Some(info) = info {
200 let disable_staff = std::env::var("ZED_DISABLE_STAFF")
201 .map_or(false, |v| !v.is_empty() && v != "0");
202 let staff = info.staff && !disable_staff;
203 cx.update_flags(staff, info.flags);
204 client.telemetry.set_authenticated_user_info(
205 Some(info.metrics_id.clone()),
206 staff,
207 );
208
209 this.update(cx, |this, _| {
210 this.set_current_user_accepted_tos_at(
211 info.accepted_tos_at,
212 );
213 })
214 } else {
215 anyhow::Ok(())
216 }
217 })??;
218
219 current_user_tx.send(user).await.ok();
220
221 this.update(&mut cx, |_, cx| cx.notify())?;
222 }
223 }
224 Status::SignedOut => {
225 current_user_tx.send(None).await.ok();
226 this.update(&mut cx, |this, cx| {
227 cx.notify();
228 this.clear_contacts()
229 })?
230 .await;
231 }
232 Status::ConnectionLost => {
233 this.update(&mut cx, |this, cx| {
234 cx.notify();
235 this.clear_contacts()
236 })?
237 .await;
238 }
239 _ => {}
240 }
241 }
242 Ok(())
243 }),
244 pending_contact_requests: Default::default(),
245 weak_self: cx.weak_model(),
246 }
247 }
248
249 #[cfg(feature = "test-support")]
250 pub fn clear_cache(&mut self) {
251 self.users.clear();
252 self.by_github_login.clear();
253 }
254
255 async fn handle_update_invite_info(
256 this: Model<Self>,
257 message: TypedEnvelope<proto::UpdateInviteInfo>,
258 mut cx: AsyncAppContext,
259 ) -> Result<()> {
260 this.update(&mut cx, |this, cx| {
261 this.invite_info = Some(InviteInfo {
262 url: Arc::from(message.payload.url),
263 count: message.payload.count,
264 });
265 cx.notify();
266 })?;
267 Ok(())
268 }
269
270 async fn handle_show_contacts(
271 this: Model<Self>,
272 _: TypedEnvelope<proto::ShowContacts>,
273 mut cx: AsyncAppContext,
274 ) -> Result<()> {
275 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
276 Ok(())
277 }
278
279 pub fn invite_info(&self) -> Option<&InviteInfo> {
280 self.invite_info.as_ref()
281 }
282
283 async fn handle_update_contacts(
284 this: Model<Self>,
285 message: TypedEnvelope<proto::UpdateContacts>,
286 mut cx: AsyncAppContext,
287 ) -> Result<()> {
288 this.update(&mut cx, |this, _| {
289 this.update_contacts_tx
290 .unbounded_send(UpdateContacts::Update(message.payload))
291 .unwrap();
292 })?;
293 Ok(())
294 }
295
296 async fn handle_update_plan(
297 this: Model<Self>,
298 message: TypedEnvelope<proto::UpdateUserPlan>,
299 mut cx: AsyncAppContext,
300 ) -> Result<()> {
301 this.update(&mut cx, |this, cx| {
302 this.current_plan = Some(message.payload.plan());
303 cx.notify();
304 })?;
305 Ok(())
306 }
307
308 fn update_contacts(
309 &mut self,
310 message: UpdateContacts,
311 cx: &ModelContext<Self>,
312 ) -> Task<Result<()>> {
313 match message {
314 UpdateContacts::Wait(barrier) => {
315 drop(barrier);
316 Task::ready(Ok(()))
317 }
318 UpdateContacts::Clear(barrier) => {
319 self.contacts.clear();
320 self.incoming_contact_requests.clear();
321 self.outgoing_contact_requests.clear();
322 drop(barrier);
323 Task::ready(Ok(()))
324 }
325 UpdateContacts::Update(message) => {
326 let mut user_ids = HashSet::default();
327 for contact in &message.contacts {
328 user_ids.insert(contact.user_id);
329 }
330 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
331 user_ids.extend(message.outgoing_requests.iter());
332
333 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
334 cx.spawn(|this, mut cx| async move {
335 load_users.await?;
336
337 // Users are fetched in parallel above and cached in call to get_users
338 // No need to parallelize here
339 let mut updated_contacts = Vec::new();
340 let this = this
341 .upgrade()
342 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
343 for contact in message.contacts {
344 updated_contacts.push(Arc::new(
345 Contact::from_proto(contact, &this, &mut cx).await?,
346 ));
347 }
348
349 let mut incoming_requests = Vec::new();
350 for request in message.incoming_requests {
351 incoming_requests.push({
352 this.update(&mut cx, |this, cx| {
353 this.get_user(request.requester_id, cx)
354 })?
355 .await?
356 });
357 }
358
359 let mut outgoing_requests = Vec::new();
360 for requested_user_id in message.outgoing_requests {
361 outgoing_requests.push(
362 this.update(&mut cx, |this, cx| this.get_user(requested_user_id, cx))?
363 .await?,
364 );
365 }
366
367 let removed_contacts =
368 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
369 let removed_incoming_requests =
370 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
371 let removed_outgoing_requests =
372 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
373
374 this.update(&mut cx, |this, cx| {
375 // Remove contacts
376 this.contacts
377 .retain(|contact| !removed_contacts.contains(&contact.user.id));
378 // Update existing contacts and insert new ones
379 for updated_contact in updated_contacts {
380 match this.contacts.binary_search_by_key(
381 &&updated_contact.user.github_login,
382 |contact| &contact.user.github_login,
383 ) {
384 Ok(ix) => this.contacts[ix] = updated_contact,
385 Err(ix) => this.contacts.insert(ix, updated_contact),
386 }
387 }
388
389 // Remove incoming contact requests
390 this.incoming_contact_requests.retain(|user| {
391 if removed_incoming_requests.contains(&user.id) {
392 cx.emit(Event::Contact {
393 user: user.clone(),
394 kind: ContactEventKind::Cancelled,
395 });
396 false
397 } else {
398 true
399 }
400 });
401 // Update existing incoming requests and insert new ones
402 for user in incoming_requests {
403 match this
404 .incoming_contact_requests
405 .binary_search_by_key(&&user.github_login, |contact| {
406 &contact.github_login
407 }) {
408 Ok(ix) => this.incoming_contact_requests[ix] = user,
409 Err(ix) => this.incoming_contact_requests.insert(ix, user),
410 }
411 }
412
413 // Remove outgoing contact requests
414 this.outgoing_contact_requests
415 .retain(|user| !removed_outgoing_requests.contains(&user.id));
416 // Update existing incoming requests and insert new ones
417 for request in outgoing_requests {
418 match this
419 .outgoing_contact_requests
420 .binary_search_by_key(&&request.github_login, |contact| {
421 &contact.github_login
422 }) {
423 Ok(ix) => this.outgoing_contact_requests[ix] = request,
424 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
425 }
426 }
427
428 cx.notify();
429 })?;
430
431 Ok(())
432 })
433 }
434 }
435 }
436
437 pub fn contacts(&self) -> &[Arc<Contact>] {
438 &self.contacts
439 }
440
441 pub fn has_contact(&self, user: &Arc<User>) -> bool {
442 self.contacts
443 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
444 .is_ok()
445 }
446
447 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
448 &self.incoming_contact_requests
449 }
450
451 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
452 &self.outgoing_contact_requests
453 }
454
455 pub fn is_contact_request_pending(&self, user: &User) -> bool {
456 self.pending_contact_requests.contains_key(&user.id)
457 }
458
459 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
460 if self
461 .contacts
462 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
463 .is_ok()
464 {
465 ContactRequestStatus::RequestAccepted
466 } else if self
467 .outgoing_contact_requests
468 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
469 .is_ok()
470 {
471 ContactRequestStatus::RequestSent
472 } else if self
473 .incoming_contact_requests
474 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
475 .is_ok()
476 {
477 ContactRequestStatus::RequestReceived
478 } else {
479 ContactRequestStatus::None
480 }
481 }
482
483 pub fn request_contact(
484 &mut self,
485 responder_id: u64,
486 cx: &mut ModelContext<Self>,
487 ) -> Task<Result<()>> {
488 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
489 }
490
491 pub fn remove_contact(
492 &mut self,
493 user_id: u64,
494 cx: &mut ModelContext<Self>,
495 ) -> Task<Result<()>> {
496 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
497 }
498
499 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
500 self.incoming_contact_requests
501 .iter()
502 .any(|user| user.id == user_id)
503 }
504
505 pub fn respond_to_contact_request(
506 &mut self,
507 requester_id: u64,
508 accept: bool,
509 cx: &mut ModelContext<Self>,
510 ) -> Task<Result<()>> {
511 self.perform_contact_request(
512 requester_id,
513 proto::RespondToContactRequest {
514 requester_id,
515 response: if accept {
516 proto::ContactRequestResponse::Accept
517 } else {
518 proto::ContactRequestResponse::Decline
519 } as i32,
520 },
521 cx,
522 )
523 }
524
525 pub fn dismiss_contact_request(
526 &self,
527 requester_id: u64,
528 cx: &ModelContext<Self>,
529 ) -> Task<Result<()>> {
530 let client = self.client.upgrade();
531 cx.spawn(move |_, _| async move {
532 client
533 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
534 .request(proto::RespondToContactRequest {
535 requester_id,
536 response: proto::ContactRequestResponse::Dismiss as i32,
537 })
538 .await?;
539 Ok(())
540 })
541 }
542
543 fn perform_contact_request<T: RequestMessage>(
544 &mut self,
545 user_id: u64,
546 request: T,
547 cx: &mut ModelContext<Self>,
548 ) -> Task<Result<()>> {
549 let client = self.client.upgrade();
550 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
551 cx.notify();
552
553 cx.spawn(move |this, mut cx| async move {
554 let response = client
555 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
556 .request(request)
557 .await;
558 this.update(&mut cx, |this, cx| {
559 if let Entry::Occupied(mut request_count) =
560 this.pending_contact_requests.entry(user_id)
561 {
562 *request_count.get_mut() -= 1;
563 if *request_count.get() == 0 {
564 request_count.remove();
565 }
566 }
567 cx.notify();
568 })?;
569 response?;
570 Ok(())
571 })
572 }
573
574 pub fn clear_contacts(&self) -> impl Future<Output = ()> {
575 let (tx, mut rx) = postage::barrier::channel();
576 self.update_contacts_tx
577 .unbounded_send(UpdateContacts::Clear(tx))
578 .unwrap();
579 async move {
580 rx.next().await;
581 }
582 }
583
584 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
585 let (tx, mut rx) = postage::barrier::channel();
586 self.update_contacts_tx
587 .unbounded_send(UpdateContacts::Wait(tx))
588 .unwrap();
589 async move {
590 rx.next().await;
591 }
592 }
593
594 pub fn get_users(
595 &self,
596 user_ids: Vec<u64>,
597 cx: &ModelContext<Self>,
598 ) -> Task<Result<Vec<Arc<User>>>> {
599 let mut user_ids_to_fetch = user_ids.clone();
600 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
601
602 cx.spawn(|this, mut cx| async move {
603 if !user_ids_to_fetch.is_empty() {
604 this.update(&mut cx, |this, cx| {
605 this.load_users(
606 proto::GetUsers {
607 user_ids: user_ids_to_fetch,
608 },
609 cx,
610 )
611 })?
612 .await?;
613 }
614
615 this.update(&mut cx, |this, _| {
616 user_ids
617 .iter()
618 .map(|user_id| {
619 this.users
620 .get(user_id)
621 .cloned()
622 .ok_or_else(|| anyhow!("user {} not found", user_id))
623 })
624 .collect()
625 })?
626 })
627 }
628
629 pub fn fuzzy_search_users(
630 &self,
631 query: String,
632 cx: &ModelContext<Self>,
633 ) -> Task<Result<Vec<Arc<User>>>> {
634 self.load_users(proto::FuzzySearchUsers { query }, cx)
635 }
636
637 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
638 self.users.get(&user_id).cloned()
639 }
640
641 pub fn get_user_optimistic(&self, user_id: u64, cx: &ModelContext<Self>) -> Option<Arc<User>> {
642 if let Some(user) = self.users.get(&user_id).cloned() {
643 return Some(user);
644 }
645
646 self.get_user(user_id, cx).detach_and_log_err(cx);
647 None
648 }
649
650 pub fn get_user(&self, user_id: u64, cx: &ModelContext<Self>) -> Task<Result<Arc<User>>> {
651 if let Some(user) = self.users.get(&user_id).cloned() {
652 return Task::ready(Ok(user));
653 }
654
655 let load_users = self.get_users(vec![user_id], cx);
656 cx.spawn(move |this, mut cx| async move {
657 load_users.await?;
658 this.update(&mut cx, |this, _| {
659 this.users
660 .get(&user_id)
661 .cloned()
662 .ok_or_else(|| anyhow!("server responded with no users"))
663 })?
664 })
665 }
666
667 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
668 self.by_github_login
669 .get(github_login)
670 .and_then(|id| self.users.get(id).cloned())
671 }
672
673 pub fn current_user(&self) -> Option<Arc<User>> {
674 self.current_user.borrow().clone()
675 }
676
677 pub fn current_plan(&self) -> Option<proto::Plan> {
678 self.current_plan
679 }
680
681 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
682 self.current_user.clone()
683 }
684
685 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
686 self.accepted_tos_at
687 .map(|accepted_tos_at| accepted_tos_at.is_some())
688 }
689
690 pub fn accept_terms_of_service(&self, cx: &ModelContext<Self>) -> Task<Result<()>> {
691 if self.current_user().is_none() {
692 return Task::ready(Err(anyhow!("no current user")));
693 };
694
695 let client = self.client.clone();
696 cx.spawn(move |this, mut cx| async move {
697 if let Some(client) = client.upgrade() {
698 let response = client
699 .request(proto::AcceptTermsOfService {})
700 .await
701 .context("error accepting tos")?;
702
703 this.update(&mut cx, |this, _| {
704 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at))
705 })
706 } else {
707 Err(anyhow!("client not found"))
708 }
709 })
710 }
711
712 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
713 self.accepted_tos_at = Some(
714 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
715 );
716 }
717
718 fn load_users(
719 &self,
720 request: impl RequestMessage<Response = UsersResponse>,
721 cx: &ModelContext<Self>,
722 ) -> Task<Result<Vec<Arc<User>>>> {
723 let client = self.client.clone();
724 cx.spawn(|this, mut cx| async move {
725 if let Some(rpc) = client.upgrade() {
726 let response = rpc.request(request).await.context("error loading users")?;
727 let users = response.users;
728
729 this.update(&mut cx, |this, _| this.insert(users))
730 } else {
731 Ok(Vec::new())
732 }
733 })
734 }
735
736 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
737 let mut ret = Vec::with_capacity(users.len());
738 for user in users {
739 let user = User::new(user);
740 if let Some(old) = self.users.insert(user.id, user.clone()) {
741 if old.github_login != user.github_login {
742 self.by_github_login.remove(&old.github_login);
743 }
744 }
745 self.by_github_login
746 .insert(user.github_login.clone(), user.id);
747 ret.push(user)
748 }
749 ret
750 }
751
752 pub fn set_participant_indices(
753 &mut self,
754 participant_indices: HashMap<u64, ParticipantIndex>,
755 cx: &mut ModelContext<Self>,
756 ) {
757 if participant_indices != self.participant_indices {
758 self.participant_indices = participant_indices;
759 cx.emit(Event::ParticipantIndicesChanged);
760 }
761 }
762
763 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
764 &self.participant_indices
765 }
766
767 pub fn participant_names(
768 &self,
769 user_ids: impl Iterator<Item = u64>,
770 cx: &AppContext,
771 ) -> HashMap<u64, SharedString> {
772 let mut ret = HashMap::default();
773 let mut missing_user_ids = Vec::new();
774 for id in user_ids {
775 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
776 ret.insert(id, github_login.into());
777 } else {
778 missing_user_ids.push(id)
779 }
780 }
781 if !missing_user_ids.is_empty() {
782 let this = self.weak_self.clone();
783 cx.spawn(|mut cx| async move {
784 this.update(&mut cx, |this, cx| this.get_users(missing_user_ids, cx))?
785 .await
786 })
787 .detach_and_log_err(cx);
788 }
789 ret
790 }
791}
792
793impl User {
794 fn new(message: proto::User) -> Arc<Self> {
795 Arc::new(User {
796 id: message.id,
797 github_login: message.github_login,
798 avatar_uri: message.avatar_url.into(),
799 })
800 }
801}
802
803impl Contact {
804 async fn from_proto(
805 contact: proto::Contact,
806 user_store: &Model<UserStore>,
807 cx: &mut AsyncAppContext,
808 ) -> Result<Self> {
809 let user = user_store
810 .update(cx, |user_store, cx| {
811 user_store.get_user(contact.user_id, cx)
812 })?
813 .await?;
814 Ok(Self {
815 user,
816 online: contact.online,
817 busy: contact.busy,
818 })
819 }
820}
821
822impl Collaborator {
823 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
824 Ok(Self {
825 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
826 replica_id: message.replica_id as ReplicaId,
827 user_id: message.user_id as UserId,
828 is_host: message.is_host,
829 })
830 }
831}