1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use collections::{HashMap, HashSet, hash_map::Entry};
5use feature_flags::FeatureFlagAppExt;
6use futures::{Future, StreamExt, channel::mpsc};
7use gpui::{
8 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::TryFutureExt as _;
15
16pub type UserId = u64;
17
18#[derive(
19 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
20)]
21pub struct ChannelId(pub u64);
22
23impl std::fmt::Display for ChannelId {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 self.0.fmt(f)
26 }
27}
28
29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
30pub struct ProjectId(pub u64);
31
32impl ProjectId {
33 pub fn to_proto(&self) -> u64 {
34 self.0
35 }
36}
37
38#[derive(
39 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
40)]
41pub struct DevServerProjectId(pub u64);
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub struct ParticipantIndex(pub u32);
45
46#[derive(Default, Debug)]
47pub struct User {
48 pub id: UserId,
49 pub github_login: String,
50 pub avatar_uri: SharedUri,
51 pub name: Option<String>,
52 pub email: Option<String>,
53}
54
55#[derive(Clone, Debug, PartialEq, Eq)]
56pub struct Collaborator {
57 pub peer_id: proto::PeerId,
58 pub replica_id: ReplicaId,
59 pub user_id: UserId,
60 pub is_host: bool,
61}
62
63impl PartialOrd for User {
64 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
65 Some(self.cmp(other))
66 }
67}
68
69impl Ord for User {
70 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
71 self.github_login.cmp(&other.github_login)
72 }
73}
74
75impl PartialEq for User {
76 fn eq(&self, other: &Self) -> bool {
77 self.id == other.id && self.github_login == other.github_login
78 }
79}
80
81impl Eq for User {}
82
83#[derive(Debug, PartialEq)]
84pub struct Contact {
85 pub user: Arc<User>,
86 pub online: bool,
87 pub busy: bool,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum ContactRequestStatus {
92 None,
93 RequestSent,
94 RequestReceived,
95 RequestAccepted,
96}
97
98pub struct UserStore {
99 users: HashMap<u64, Arc<User>>,
100 by_github_login: HashMap<String, u64>,
101 participant_indices: HashMap<u64, ParticipantIndex>,
102 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
103 current_plan: Option<proto::Plan>,
104 trial_started_at: Option<DateTime<Utc>>,
105 is_usage_based_billing_enabled: Option<bool>,
106 current_user: watch::Receiver<Option<Arc<User>>>,
107 accepted_tos_at: Option<Option<DateTime<Utc>>>,
108 contacts: Vec<Arc<Contact>>,
109 incoming_contact_requests: Vec<Arc<User>>,
110 outgoing_contact_requests: Vec<Arc<User>>,
111 pending_contact_requests: HashMap<u64, usize>,
112 invite_info: Option<InviteInfo>,
113 client: Weak<Client>,
114 _maintain_contacts: Task<()>,
115 _maintain_current_user: Task<Result<()>>,
116 weak_self: WeakEntity<Self>,
117}
118
119#[derive(Clone)]
120pub struct InviteInfo {
121 pub count: u32,
122 pub url: Arc<str>,
123}
124
125pub enum Event {
126 Contact {
127 user: Arc<User>,
128 kind: ContactEventKind,
129 },
130 ShowContacts,
131 ParticipantIndicesChanged,
132 PrivateUserInfoUpdated,
133}
134
135#[derive(Clone, Copy)]
136pub enum ContactEventKind {
137 Requested,
138 Accepted,
139 Cancelled,
140}
141
142impl EventEmitter<Event> for UserStore {}
143
144enum UpdateContacts {
145 Update(proto::UpdateContacts),
146 Wait(postage::barrier::Sender),
147 Clear(postage::barrier::Sender),
148}
149
150impl UserStore {
151 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
152 let (mut current_user_tx, current_user_rx) = watch::channel();
153 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
154 let rpc_subscriptions = vec![
155 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
156 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
157 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
158 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
159 ];
160 Self {
161 users: Default::default(),
162 by_github_login: Default::default(),
163 current_user: current_user_rx,
164 current_plan: None,
165 trial_started_at: None,
166 is_usage_based_billing_enabled: None,
167 accepted_tos_at: None,
168 contacts: Default::default(),
169 incoming_contact_requests: Default::default(),
170 participant_indices: Default::default(),
171 outgoing_contact_requests: Default::default(),
172 invite_info: None,
173 client: Arc::downgrade(&client),
174 update_contacts_tx,
175 _maintain_contacts: cx.spawn(async move |this, cx| {
176 let _subscriptions = rpc_subscriptions;
177 while let Some(message) = update_contacts_rx.next().await {
178 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
179 {
180 task.log_err().await;
181 } else {
182 break;
183 }
184 }
185 }),
186 _maintain_current_user: cx.spawn(async move |this, cx| {
187 let mut status = client.status();
188 let weak = Arc::downgrade(&client);
189 drop(client);
190 while let Some(status) = status.next().await {
191 // if the client is dropped, the app is shutting down.
192 let Some(client) = weak.upgrade() else {
193 return Ok(());
194 };
195 match status {
196 Status::Connected { .. } => {
197 if let Some(user_id) = client.user_id() {
198 let fetch_user = if let Ok(fetch_user) =
199 this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
200 {
201 fetch_user
202 } else {
203 break;
204 };
205 let fetch_private_user_info =
206 client.request(proto::GetPrivateUserInfo {}).log_err();
207 let (user, info) =
208 futures::join!(fetch_user, fetch_private_user_info);
209
210 cx.update(|cx| {
211 if let Some(info) = info {
212 let staff =
213 info.staff && !*feature_flags::ZED_DISABLE_STAFF;
214 cx.update_flags(staff, info.flags);
215 client.telemetry.set_authenticated_user_info(
216 Some(info.metrics_id.clone()),
217 staff,
218 );
219
220 this.update(cx, |this, cx| {
221 let accepted_tos_at = {
222 #[cfg(debug_assertions)]
223 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
224 {
225 None
226 } else {
227 info.accepted_tos_at
228 }
229
230 #[cfg(not(debug_assertions))]
231 info.accepted_tos_at
232 };
233
234 this.set_current_user_accepted_tos_at(accepted_tos_at);
235 cx.emit(Event::PrivateUserInfoUpdated);
236 })
237 } else {
238 anyhow::Ok(())
239 }
240 })??;
241
242 current_user_tx.send(user).await.ok();
243
244 this.update(cx, |_, cx| cx.notify())?;
245 }
246 }
247 Status::SignedOut => {
248 current_user_tx.send(None).await.ok();
249 this.update(cx, |this, cx| {
250 this.accepted_tos_at = None;
251 cx.emit(Event::PrivateUserInfoUpdated);
252 cx.notify();
253 this.clear_contacts()
254 })?
255 .await;
256 }
257 Status::ConnectionLost => {
258 this.update(cx, |this, cx| {
259 cx.notify();
260 this.clear_contacts()
261 })?
262 .await;
263 }
264 _ => {}
265 }
266 }
267 Ok(())
268 }),
269 pending_contact_requests: Default::default(),
270 weak_self: cx.weak_entity(),
271 }
272 }
273
274 #[cfg(feature = "test-support")]
275 pub fn clear_cache(&mut self) {
276 self.users.clear();
277 self.by_github_login.clear();
278 }
279
280 async fn handle_update_invite_info(
281 this: Entity<Self>,
282 message: TypedEnvelope<proto::UpdateInviteInfo>,
283 mut cx: AsyncApp,
284 ) -> Result<()> {
285 this.update(&mut cx, |this, cx| {
286 this.invite_info = Some(InviteInfo {
287 url: Arc::from(message.payload.url),
288 count: message.payload.count,
289 });
290 cx.notify();
291 })?;
292 Ok(())
293 }
294
295 async fn handle_show_contacts(
296 this: Entity<Self>,
297 _: TypedEnvelope<proto::ShowContacts>,
298 mut cx: AsyncApp,
299 ) -> Result<()> {
300 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
301 Ok(())
302 }
303
304 pub fn invite_info(&self) -> Option<&InviteInfo> {
305 self.invite_info.as_ref()
306 }
307
308 async fn handle_update_contacts(
309 this: Entity<Self>,
310 message: TypedEnvelope<proto::UpdateContacts>,
311 mut cx: AsyncApp,
312 ) -> Result<()> {
313 this.update(&mut cx, |this, _| {
314 this.update_contacts_tx
315 .unbounded_send(UpdateContacts::Update(message.payload))
316 .unwrap();
317 })?;
318 Ok(())
319 }
320
321 async fn handle_update_plan(
322 this: Entity<Self>,
323 message: TypedEnvelope<proto::UpdateUserPlan>,
324 mut cx: AsyncApp,
325 ) -> Result<()> {
326 this.update(&mut cx, |this, cx| {
327 this.current_plan = Some(message.payload.plan());
328 this.trial_started_at = message
329 .payload
330 .trial_started_at
331 .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
332 this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
333 cx.notify();
334 })?;
335 Ok(())
336 }
337
338 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
339 match message {
340 UpdateContacts::Wait(barrier) => {
341 drop(barrier);
342 Task::ready(Ok(()))
343 }
344 UpdateContacts::Clear(barrier) => {
345 self.contacts.clear();
346 self.incoming_contact_requests.clear();
347 self.outgoing_contact_requests.clear();
348 drop(barrier);
349 Task::ready(Ok(()))
350 }
351 UpdateContacts::Update(message) => {
352 let mut user_ids = HashSet::default();
353 for contact in &message.contacts {
354 user_ids.insert(contact.user_id);
355 }
356 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
357 user_ids.extend(message.outgoing_requests.iter());
358
359 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
360 cx.spawn(async move |this, cx| {
361 load_users.await?;
362
363 // Users are fetched in parallel above and cached in call to get_users
364 // No need to parallelize here
365 let mut updated_contacts = Vec::new();
366 let this = this
367 .upgrade()
368 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
369 for contact in message.contacts {
370 updated_contacts
371 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
372 }
373
374 let mut incoming_requests = Vec::new();
375 for request in message.incoming_requests {
376 incoming_requests.push({
377 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
378 .await?
379 });
380 }
381
382 let mut outgoing_requests = Vec::new();
383 for requested_user_id in message.outgoing_requests {
384 outgoing_requests.push(
385 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
386 .await?,
387 );
388 }
389
390 let removed_contacts =
391 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
392 let removed_incoming_requests =
393 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
394 let removed_outgoing_requests =
395 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
396
397 this.update(cx, |this, cx| {
398 // Remove contacts
399 this.contacts
400 .retain(|contact| !removed_contacts.contains(&contact.user.id));
401 // Update existing contacts and insert new ones
402 for updated_contact in updated_contacts {
403 match this.contacts.binary_search_by_key(
404 &&updated_contact.user.github_login,
405 |contact| &contact.user.github_login,
406 ) {
407 Ok(ix) => this.contacts[ix] = updated_contact,
408 Err(ix) => this.contacts.insert(ix, updated_contact),
409 }
410 }
411
412 // Remove incoming contact requests
413 this.incoming_contact_requests.retain(|user| {
414 if removed_incoming_requests.contains(&user.id) {
415 cx.emit(Event::Contact {
416 user: user.clone(),
417 kind: ContactEventKind::Cancelled,
418 });
419 false
420 } else {
421 true
422 }
423 });
424 // Update existing incoming requests and insert new ones
425 for user in incoming_requests {
426 match this
427 .incoming_contact_requests
428 .binary_search_by_key(&&user.github_login, |contact| {
429 &contact.github_login
430 }) {
431 Ok(ix) => this.incoming_contact_requests[ix] = user,
432 Err(ix) => this.incoming_contact_requests.insert(ix, user),
433 }
434 }
435
436 // Remove outgoing contact requests
437 this.outgoing_contact_requests
438 .retain(|user| !removed_outgoing_requests.contains(&user.id));
439 // Update existing incoming requests and insert new ones
440 for request in outgoing_requests {
441 match this
442 .outgoing_contact_requests
443 .binary_search_by_key(&&request.github_login, |contact| {
444 &contact.github_login
445 }) {
446 Ok(ix) => this.outgoing_contact_requests[ix] = request,
447 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
448 }
449 }
450
451 cx.notify();
452 })?;
453
454 Ok(())
455 })
456 }
457 }
458 }
459
460 pub fn contacts(&self) -> &[Arc<Contact>] {
461 &self.contacts
462 }
463
464 pub fn has_contact(&self, user: &Arc<User>) -> bool {
465 self.contacts
466 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
467 .is_ok()
468 }
469
470 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
471 &self.incoming_contact_requests
472 }
473
474 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
475 &self.outgoing_contact_requests
476 }
477
478 pub fn is_contact_request_pending(&self, user: &User) -> bool {
479 self.pending_contact_requests.contains_key(&user.id)
480 }
481
482 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
483 if self
484 .contacts
485 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
486 .is_ok()
487 {
488 ContactRequestStatus::RequestAccepted
489 } else if self
490 .outgoing_contact_requests
491 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
492 .is_ok()
493 {
494 ContactRequestStatus::RequestSent
495 } else if self
496 .incoming_contact_requests
497 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
498 .is_ok()
499 {
500 ContactRequestStatus::RequestReceived
501 } else {
502 ContactRequestStatus::None
503 }
504 }
505
506 pub fn request_contact(
507 &mut self,
508 responder_id: u64,
509 cx: &mut Context<Self>,
510 ) -> Task<Result<()>> {
511 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
512 }
513
514 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
515 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
516 }
517
518 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
519 self.incoming_contact_requests
520 .iter()
521 .any(|user| user.id == user_id)
522 }
523
524 pub fn respond_to_contact_request(
525 &mut self,
526 requester_id: u64,
527 accept: bool,
528 cx: &mut Context<Self>,
529 ) -> Task<Result<()>> {
530 self.perform_contact_request(
531 requester_id,
532 proto::RespondToContactRequest {
533 requester_id,
534 response: if accept {
535 proto::ContactRequestResponse::Accept
536 } else {
537 proto::ContactRequestResponse::Decline
538 } as i32,
539 },
540 cx,
541 )
542 }
543
544 pub fn dismiss_contact_request(
545 &self,
546 requester_id: u64,
547 cx: &Context<Self>,
548 ) -> Task<Result<()>> {
549 let client = self.client.upgrade();
550 cx.spawn(async move |_, _| {
551 client
552 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
553 .request(proto::RespondToContactRequest {
554 requester_id,
555 response: proto::ContactRequestResponse::Dismiss as i32,
556 })
557 .await?;
558 Ok(())
559 })
560 }
561
562 fn perform_contact_request<T: RequestMessage>(
563 &mut self,
564 user_id: u64,
565 request: T,
566 cx: &mut Context<Self>,
567 ) -> Task<Result<()>> {
568 let client = self.client.upgrade();
569 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
570 cx.notify();
571
572 cx.spawn(async move |this, cx| {
573 let response = client
574 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
575 .request(request)
576 .await;
577 this.update(cx, |this, cx| {
578 if let Entry::Occupied(mut request_count) =
579 this.pending_contact_requests.entry(user_id)
580 {
581 *request_count.get_mut() -= 1;
582 if *request_count.get() == 0 {
583 request_count.remove();
584 }
585 }
586 cx.notify();
587 })?;
588 response?;
589 Ok(())
590 })
591 }
592
593 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
594 let (tx, mut rx) = postage::barrier::channel();
595 self.update_contacts_tx
596 .unbounded_send(UpdateContacts::Clear(tx))
597 .unwrap();
598 async move {
599 rx.next().await;
600 }
601 }
602
603 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
604 let (tx, mut rx) = postage::barrier::channel();
605 self.update_contacts_tx
606 .unbounded_send(UpdateContacts::Wait(tx))
607 .unwrap();
608 async move {
609 rx.next().await;
610 }
611 }
612
613 pub fn get_users(
614 &self,
615 user_ids: Vec<u64>,
616 cx: &Context<Self>,
617 ) -> Task<Result<Vec<Arc<User>>>> {
618 let mut user_ids_to_fetch = user_ids.clone();
619 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
620
621 cx.spawn(async move |this, cx| {
622 if !user_ids_to_fetch.is_empty() {
623 this.update(cx, |this, cx| {
624 this.load_users(
625 proto::GetUsers {
626 user_ids: user_ids_to_fetch,
627 },
628 cx,
629 )
630 })?
631 .await?;
632 }
633
634 this.update(cx, |this, _| {
635 user_ids
636 .iter()
637 .map(|user_id| {
638 this.users
639 .get(user_id)
640 .cloned()
641 .ok_or_else(|| anyhow!("user {} not found", user_id))
642 })
643 .collect()
644 })?
645 })
646 }
647
648 pub fn fuzzy_search_users(
649 &self,
650 query: String,
651 cx: &Context<Self>,
652 ) -> Task<Result<Vec<Arc<User>>>> {
653 self.load_users(proto::FuzzySearchUsers { query }, cx)
654 }
655
656 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
657 self.users.get(&user_id).cloned()
658 }
659
660 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
661 if let Some(user) = self.users.get(&user_id).cloned() {
662 return Some(user);
663 }
664
665 self.get_user(user_id, cx).detach_and_log_err(cx);
666 None
667 }
668
669 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
670 if let Some(user) = self.users.get(&user_id).cloned() {
671 return Task::ready(Ok(user));
672 }
673
674 let load_users = self.get_users(vec![user_id], cx);
675 cx.spawn(async move |this, cx| {
676 load_users.await?;
677 this.update(cx, |this, _| {
678 this.users
679 .get(&user_id)
680 .cloned()
681 .ok_or_else(|| anyhow!("server responded with no users"))
682 })?
683 })
684 }
685
686 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
687 self.by_github_login
688 .get(github_login)
689 .and_then(|id| self.users.get(id).cloned())
690 }
691
692 pub fn current_user(&self) -> Option<Arc<User>> {
693 self.current_user.borrow().clone()
694 }
695
696 pub fn current_plan(&self) -> Option<proto::Plan> {
697 self.current_plan
698 }
699
700 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
701 self.current_user.clone()
702 }
703
704 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
705 self.accepted_tos_at
706 .map(|accepted_tos_at| accepted_tos_at.is_some())
707 }
708
709 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
710 if self.current_user().is_none() {
711 return Task::ready(Err(anyhow!("no current user")));
712 };
713
714 let client = self.client.clone();
715 cx.spawn(async move |this, cx| {
716 if let Some(client) = client.upgrade() {
717 let response = client
718 .request(proto::AcceptTermsOfService {})
719 .await
720 .context("error accepting tos")?;
721
722 this.update(cx, |this, cx| {
723 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
724 cx.emit(Event::PrivateUserInfoUpdated);
725 })
726 } else {
727 Err(anyhow!("client not found"))
728 }
729 })
730 }
731
732 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
733 self.accepted_tos_at = Some(
734 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
735 );
736 }
737
738 fn load_users(
739 &self,
740 request: impl RequestMessage<Response = UsersResponse>,
741 cx: &Context<Self>,
742 ) -> Task<Result<Vec<Arc<User>>>> {
743 let client = self.client.clone();
744 cx.spawn(async move |this, cx| {
745 if let Some(rpc) = client.upgrade() {
746 let response = rpc.request(request).await.context("error loading users")?;
747 let users = response.users;
748
749 this.update(cx, |this, _| this.insert(users))
750 } else {
751 Ok(Vec::new())
752 }
753 })
754 }
755
756 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
757 let mut ret = Vec::with_capacity(users.len());
758 for user in users {
759 let user = User::new(user);
760 if let Some(old) = self.users.insert(user.id, user.clone()) {
761 if old.github_login != user.github_login {
762 self.by_github_login.remove(&old.github_login);
763 }
764 }
765 self.by_github_login
766 .insert(user.github_login.clone(), user.id);
767 ret.push(user)
768 }
769 ret
770 }
771
772 pub fn set_participant_indices(
773 &mut self,
774 participant_indices: HashMap<u64, ParticipantIndex>,
775 cx: &mut Context<Self>,
776 ) {
777 if participant_indices != self.participant_indices {
778 self.participant_indices = participant_indices;
779 cx.emit(Event::ParticipantIndicesChanged);
780 }
781 }
782
783 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
784 &self.participant_indices
785 }
786
787 pub fn participant_names(
788 &self,
789 user_ids: impl Iterator<Item = u64>,
790 cx: &App,
791 ) -> HashMap<u64, SharedString> {
792 let mut ret = HashMap::default();
793 let mut missing_user_ids = Vec::new();
794 for id in user_ids {
795 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
796 ret.insert(id, github_login.into());
797 } else {
798 missing_user_ids.push(id)
799 }
800 }
801 if !missing_user_ids.is_empty() {
802 let this = self.weak_self.clone();
803 cx.spawn(async move |cx| {
804 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
805 .await
806 })
807 .detach_and_log_err(cx);
808 }
809 ret
810 }
811}
812
813impl User {
814 fn new(message: proto::User) -> Arc<Self> {
815 Arc::new(User {
816 id: message.id,
817 github_login: message.github_login,
818 avatar_uri: message.avatar_url.into(),
819 name: message.name,
820 email: message.email,
821 })
822 }
823}
824
825impl Contact {
826 async fn from_proto(
827 contact: proto::Contact,
828 user_store: &Entity<UserStore>,
829 cx: &mut AsyncApp,
830 ) -> Result<Self> {
831 let user = user_store
832 .update(cx, |user_store, cx| {
833 user_store.get_user(contact.user_id, cx)
834 })?
835 .await?;
836 Ok(Self {
837 user,
838 online: contact.online,
839 busy: contact.busy,
840 })
841 }
842}
843
844impl Collaborator {
845 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
846 Ok(Self {
847 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
848 replica_id: message.replica_id as ReplicaId,
849 user_id: message.user_id as UserId,
850 is_host: message.is_host,
851 })
852 }
853}