1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use collections::{HashMap, HashSet, hash_map::Entry};
5use feature_flags::FeatureFlagAppExt;
6use futures::{Future, StreamExt, channel::mpsc};
7use gpui::{
8 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::TryFutureExt as _;
15
16pub type UserId = u64;
17
18#[derive(
19 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
20)]
21pub struct ChannelId(pub u64);
22
23impl std::fmt::Display for ChannelId {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 self.0.fmt(f)
26 }
27}
28
29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
30pub struct ProjectId(pub u64);
31
32impl ProjectId {
33 pub fn to_proto(&self) -> u64 {
34 self.0
35 }
36}
37
38#[derive(
39 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
40)]
41pub struct DevServerProjectId(pub u64);
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub struct ParticipantIndex(pub u32);
45
46#[derive(Default, Debug)]
47pub struct User {
48 pub id: UserId,
49 pub github_login: String,
50 pub avatar_uri: SharedUri,
51 pub name: Option<String>,
52 pub email: Option<String>,
53}
54
55#[derive(Clone, Debug, PartialEq, Eq)]
56pub struct Collaborator {
57 pub peer_id: proto::PeerId,
58 pub replica_id: ReplicaId,
59 pub user_id: UserId,
60 pub is_host: bool,
61}
62
63impl PartialOrd for User {
64 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
65 Some(self.cmp(other))
66 }
67}
68
69impl Ord for User {
70 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
71 self.github_login.cmp(&other.github_login)
72 }
73}
74
75impl PartialEq for User {
76 fn eq(&self, other: &Self) -> bool {
77 self.id == other.id && self.github_login == other.github_login
78 }
79}
80
81impl Eq for User {}
82
83#[derive(Debug, PartialEq)]
84pub struct Contact {
85 pub user: Arc<User>,
86 pub online: bool,
87 pub busy: bool,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum ContactRequestStatus {
92 None,
93 RequestSent,
94 RequestReceived,
95 RequestAccepted,
96}
97
98pub struct UserStore {
99 users: HashMap<u64, Arc<User>>,
100 by_github_login: HashMap<String, u64>,
101 participant_indices: HashMap<u64, ParticipantIndex>,
102 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
103 current_plan: Option<proto::Plan>,
104 trial_started_at: Option<DateTime<Utc>>,
105 model_request_usage_amount: Option<u32>,
106 model_request_usage_limit: Option<proto::UsageLimit>,
107 edit_predictions_usage_amount: Option<u32>,
108 edit_predictions_usage_limit: Option<proto::UsageLimit>,
109 is_usage_based_billing_enabled: Option<bool>,
110 current_user: watch::Receiver<Option<Arc<User>>>,
111 accepted_tos_at: Option<Option<DateTime<Utc>>>,
112 contacts: Vec<Arc<Contact>>,
113 incoming_contact_requests: Vec<Arc<User>>,
114 outgoing_contact_requests: Vec<Arc<User>>,
115 pending_contact_requests: HashMap<u64, usize>,
116 invite_info: Option<InviteInfo>,
117 client: Weak<Client>,
118 _maintain_contacts: Task<()>,
119 _maintain_current_user: Task<Result<()>>,
120 weak_self: WeakEntity<Self>,
121}
122
123#[derive(Clone)]
124pub struct InviteInfo {
125 pub count: u32,
126 pub url: Arc<str>,
127}
128
129pub enum Event {
130 Contact {
131 user: Arc<User>,
132 kind: ContactEventKind,
133 },
134 ShowContacts,
135 ParticipantIndicesChanged,
136 PrivateUserInfoUpdated,
137}
138
139#[derive(Clone, Copy)]
140pub enum ContactEventKind {
141 Requested,
142 Accepted,
143 Cancelled,
144}
145
146impl EventEmitter<Event> for UserStore {}
147
148enum UpdateContacts {
149 Update(proto::UpdateContacts),
150 Wait(postage::barrier::Sender),
151 Clear(postage::barrier::Sender),
152}
153
154impl UserStore {
155 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
156 let (mut current_user_tx, current_user_rx) = watch::channel();
157 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
158 let rpc_subscriptions = vec![
159 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
160 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
161 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
162 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
163 ];
164 Self {
165 users: Default::default(),
166 by_github_login: Default::default(),
167 current_user: current_user_rx,
168 current_plan: None,
169 trial_started_at: None,
170 model_request_usage_amount: None,
171 model_request_usage_limit: None,
172 edit_predictions_usage_amount: None,
173 edit_predictions_usage_limit: None,
174 is_usage_based_billing_enabled: None,
175 accepted_tos_at: None,
176 contacts: Default::default(),
177 incoming_contact_requests: Default::default(),
178 participant_indices: Default::default(),
179 outgoing_contact_requests: Default::default(),
180 invite_info: None,
181 client: Arc::downgrade(&client),
182 update_contacts_tx,
183 _maintain_contacts: cx.spawn(async move |this, cx| {
184 let _subscriptions = rpc_subscriptions;
185 while let Some(message) = update_contacts_rx.next().await {
186 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
187 {
188 task.log_err().await;
189 } else {
190 break;
191 }
192 }
193 }),
194 _maintain_current_user: cx.spawn(async move |this, cx| {
195 let mut status = client.status();
196 let weak = Arc::downgrade(&client);
197 drop(client);
198 while let Some(status) = status.next().await {
199 // if the client is dropped, the app is shutting down.
200 let Some(client) = weak.upgrade() else {
201 return Ok(());
202 };
203 match status {
204 Status::Connected { .. } => {
205 if let Some(user_id) = client.user_id() {
206 let fetch_user = if let Ok(fetch_user) =
207 this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
208 {
209 fetch_user
210 } else {
211 break;
212 };
213 let fetch_private_user_info =
214 client.request(proto::GetPrivateUserInfo {}).log_err();
215 let (user, info) =
216 futures::join!(fetch_user, fetch_private_user_info);
217
218 cx.update(|cx| {
219 if let Some(info) = info {
220 let staff =
221 info.staff && !*feature_flags::ZED_DISABLE_STAFF;
222 cx.update_flags(staff, info.flags);
223 client.telemetry.set_authenticated_user_info(
224 Some(info.metrics_id.clone()),
225 staff,
226 );
227
228 this.update(cx, |this, cx| {
229 let accepted_tos_at = {
230 #[cfg(debug_assertions)]
231 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
232 {
233 None
234 } else {
235 info.accepted_tos_at
236 }
237
238 #[cfg(not(debug_assertions))]
239 info.accepted_tos_at
240 };
241
242 this.set_current_user_accepted_tos_at(accepted_tos_at);
243 cx.emit(Event::PrivateUserInfoUpdated);
244 })
245 } else {
246 anyhow::Ok(())
247 }
248 })??;
249
250 current_user_tx.send(user).await.ok();
251
252 this.update(cx, |_, cx| cx.notify())?;
253 }
254 }
255 Status::SignedOut => {
256 current_user_tx.send(None).await.ok();
257 this.update(cx, |this, cx| {
258 this.accepted_tos_at = None;
259 cx.emit(Event::PrivateUserInfoUpdated);
260 cx.notify();
261 this.clear_contacts()
262 })?
263 .await;
264 }
265 Status::ConnectionLost => {
266 this.update(cx, |this, cx| {
267 cx.notify();
268 this.clear_contacts()
269 })?
270 .await;
271 }
272 _ => {}
273 }
274 }
275 Ok(())
276 }),
277 pending_contact_requests: Default::default(),
278 weak_self: cx.weak_entity(),
279 }
280 }
281
282 #[cfg(feature = "test-support")]
283 pub fn clear_cache(&mut self) {
284 self.users.clear();
285 self.by_github_login.clear();
286 }
287
288 async fn handle_update_invite_info(
289 this: Entity<Self>,
290 message: TypedEnvelope<proto::UpdateInviteInfo>,
291 mut cx: AsyncApp,
292 ) -> Result<()> {
293 this.update(&mut cx, |this, cx| {
294 this.invite_info = Some(InviteInfo {
295 url: Arc::from(message.payload.url),
296 count: message.payload.count,
297 });
298 cx.notify();
299 })?;
300 Ok(())
301 }
302
303 async fn handle_show_contacts(
304 this: Entity<Self>,
305 _: TypedEnvelope<proto::ShowContacts>,
306 mut cx: AsyncApp,
307 ) -> Result<()> {
308 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
309 Ok(())
310 }
311
312 pub fn invite_info(&self) -> Option<&InviteInfo> {
313 self.invite_info.as_ref()
314 }
315
316 async fn handle_update_contacts(
317 this: Entity<Self>,
318 message: TypedEnvelope<proto::UpdateContacts>,
319 mut cx: AsyncApp,
320 ) -> Result<()> {
321 this.update(&mut cx, |this, _| {
322 this.update_contacts_tx
323 .unbounded_send(UpdateContacts::Update(message.payload))
324 .unwrap();
325 })?;
326 Ok(())
327 }
328
329 async fn handle_update_plan(
330 this: Entity<Self>,
331 message: TypedEnvelope<proto::UpdateUserPlan>,
332 mut cx: AsyncApp,
333 ) -> Result<()> {
334 this.update(&mut cx, |this, cx| {
335 this.current_plan = Some(message.payload.plan());
336 this.trial_started_at = message
337 .payload
338 .trial_started_at
339 .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
340 this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
341
342 if let Some(usage) = message.payload.usage {
343 this.model_request_usage_amount = Some(usage.model_requests_usage_amount);
344 this.model_request_usage_limit = usage.model_requests_usage_limit;
345 this.edit_predictions_usage_amount = Some(usage.edit_predictions_usage_amount);
346 this.edit_predictions_usage_limit = usage.edit_predictions_usage_limit;
347 }
348
349 cx.notify();
350 })?;
351 Ok(())
352 }
353
354 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
355 match message {
356 UpdateContacts::Wait(barrier) => {
357 drop(barrier);
358 Task::ready(Ok(()))
359 }
360 UpdateContacts::Clear(barrier) => {
361 self.contacts.clear();
362 self.incoming_contact_requests.clear();
363 self.outgoing_contact_requests.clear();
364 drop(barrier);
365 Task::ready(Ok(()))
366 }
367 UpdateContacts::Update(message) => {
368 let mut user_ids = HashSet::default();
369 for contact in &message.contacts {
370 user_ids.insert(contact.user_id);
371 }
372 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
373 user_ids.extend(message.outgoing_requests.iter());
374
375 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
376 cx.spawn(async move |this, cx| {
377 load_users.await?;
378
379 // Users are fetched in parallel above and cached in call to get_users
380 // No need to parallelize here
381 let mut updated_contacts = Vec::new();
382 let this = this
383 .upgrade()
384 .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
385 for contact in message.contacts {
386 updated_contacts
387 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
388 }
389
390 let mut incoming_requests = Vec::new();
391 for request in message.incoming_requests {
392 incoming_requests.push({
393 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
394 .await?
395 });
396 }
397
398 let mut outgoing_requests = Vec::new();
399 for requested_user_id in message.outgoing_requests {
400 outgoing_requests.push(
401 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
402 .await?,
403 );
404 }
405
406 let removed_contacts =
407 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
408 let removed_incoming_requests =
409 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
410 let removed_outgoing_requests =
411 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
412
413 this.update(cx, |this, cx| {
414 // Remove contacts
415 this.contacts
416 .retain(|contact| !removed_contacts.contains(&contact.user.id));
417 // Update existing contacts and insert new ones
418 for updated_contact in updated_contacts {
419 match this.contacts.binary_search_by_key(
420 &&updated_contact.user.github_login,
421 |contact| &contact.user.github_login,
422 ) {
423 Ok(ix) => this.contacts[ix] = updated_contact,
424 Err(ix) => this.contacts.insert(ix, updated_contact),
425 }
426 }
427
428 // Remove incoming contact requests
429 this.incoming_contact_requests.retain(|user| {
430 if removed_incoming_requests.contains(&user.id) {
431 cx.emit(Event::Contact {
432 user: user.clone(),
433 kind: ContactEventKind::Cancelled,
434 });
435 false
436 } else {
437 true
438 }
439 });
440 // Update existing incoming requests and insert new ones
441 for user in incoming_requests {
442 match this
443 .incoming_contact_requests
444 .binary_search_by_key(&&user.github_login, |contact| {
445 &contact.github_login
446 }) {
447 Ok(ix) => this.incoming_contact_requests[ix] = user,
448 Err(ix) => this.incoming_contact_requests.insert(ix, user),
449 }
450 }
451
452 // Remove outgoing contact requests
453 this.outgoing_contact_requests
454 .retain(|user| !removed_outgoing_requests.contains(&user.id));
455 // Update existing incoming requests and insert new ones
456 for request in outgoing_requests {
457 match this
458 .outgoing_contact_requests
459 .binary_search_by_key(&&request.github_login, |contact| {
460 &contact.github_login
461 }) {
462 Ok(ix) => this.outgoing_contact_requests[ix] = request,
463 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
464 }
465 }
466
467 cx.notify();
468 })?;
469
470 Ok(())
471 })
472 }
473 }
474 }
475
476 pub fn contacts(&self) -> &[Arc<Contact>] {
477 &self.contacts
478 }
479
480 pub fn has_contact(&self, user: &Arc<User>) -> bool {
481 self.contacts
482 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
483 .is_ok()
484 }
485
486 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
487 &self.incoming_contact_requests
488 }
489
490 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
491 &self.outgoing_contact_requests
492 }
493
494 pub fn is_contact_request_pending(&self, user: &User) -> bool {
495 self.pending_contact_requests.contains_key(&user.id)
496 }
497
498 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
499 if self
500 .contacts
501 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
502 .is_ok()
503 {
504 ContactRequestStatus::RequestAccepted
505 } else if self
506 .outgoing_contact_requests
507 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
508 .is_ok()
509 {
510 ContactRequestStatus::RequestSent
511 } else if self
512 .incoming_contact_requests
513 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
514 .is_ok()
515 {
516 ContactRequestStatus::RequestReceived
517 } else {
518 ContactRequestStatus::None
519 }
520 }
521
522 pub fn request_contact(
523 &mut self,
524 responder_id: u64,
525 cx: &mut Context<Self>,
526 ) -> Task<Result<()>> {
527 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
528 }
529
530 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
531 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
532 }
533
534 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
535 self.incoming_contact_requests
536 .iter()
537 .any(|user| user.id == user_id)
538 }
539
540 pub fn respond_to_contact_request(
541 &mut self,
542 requester_id: u64,
543 accept: bool,
544 cx: &mut Context<Self>,
545 ) -> Task<Result<()>> {
546 self.perform_contact_request(
547 requester_id,
548 proto::RespondToContactRequest {
549 requester_id,
550 response: if accept {
551 proto::ContactRequestResponse::Accept
552 } else {
553 proto::ContactRequestResponse::Decline
554 } as i32,
555 },
556 cx,
557 )
558 }
559
560 pub fn dismiss_contact_request(
561 &self,
562 requester_id: u64,
563 cx: &Context<Self>,
564 ) -> Task<Result<()>> {
565 let client = self.client.upgrade();
566 cx.spawn(async move |_, _| {
567 client
568 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
569 .request(proto::RespondToContactRequest {
570 requester_id,
571 response: proto::ContactRequestResponse::Dismiss as i32,
572 })
573 .await?;
574 Ok(())
575 })
576 }
577
578 fn perform_contact_request<T: RequestMessage>(
579 &mut self,
580 user_id: u64,
581 request: T,
582 cx: &mut Context<Self>,
583 ) -> Task<Result<()>> {
584 let client = self.client.upgrade();
585 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
586 cx.notify();
587
588 cx.spawn(async move |this, cx| {
589 let response = client
590 .ok_or_else(|| anyhow!("can't upgrade client reference"))?
591 .request(request)
592 .await;
593 this.update(cx, |this, cx| {
594 if let Entry::Occupied(mut request_count) =
595 this.pending_contact_requests.entry(user_id)
596 {
597 *request_count.get_mut() -= 1;
598 if *request_count.get() == 0 {
599 request_count.remove();
600 }
601 }
602 cx.notify();
603 })?;
604 response?;
605 Ok(())
606 })
607 }
608
609 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
610 let (tx, mut rx) = postage::barrier::channel();
611 self.update_contacts_tx
612 .unbounded_send(UpdateContacts::Clear(tx))
613 .unwrap();
614 async move {
615 rx.next().await;
616 }
617 }
618
619 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
620 let (tx, mut rx) = postage::barrier::channel();
621 self.update_contacts_tx
622 .unbounded_send(UpdateContacts::Wait(tx))
623 .unwrap();
624 async move {
625 rx.next().await;
626 }
627 }
628
629 pub fn get_users(
630 &self,
631 user_ids: Vec<u64>,
632 cx: &Context<Self>,
633 ) -> Task<Result<Vec<Arc<User>>>> {
634 let mut user_ids_to_fetch = user_ids.clone();
635 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
636
637 cx.spawn(async move |this, cx| {
638 if !user_ids_to_fetch.is_empty() {
639 this.update(cx, |this, cx| {
640 this.load_users(
641 proto::GetUsers {
642 user_ids: user_ids_to_fetch,
643 },
644 cx,
645 )
646 })?
647 .await?;
648 }
649
650 this.update(cx, |this, _| {
651 user_ids
652 .iter()
653 .map(|user_id| {
654 this.users
655 .get(user_id)
656 .cloned()
657 .ok_or_else(|| anyhow!("user {} not found", user_id))
658 })
659 .collect()
660 })?
661 })
662 }
663
664 pub fn fuzzy_search_users(
665 &self,
666 query: String,
667 cx: &Context<Self>,
668 ) -> Task<Result<Vec<Arc<User>>>> {
669 self.load_users(proto::FuzzySearchUsers { query }, cx)
670 }
671
672 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
673 self.users.get(&user_id).cloned()
674 }
675
676 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
677 if let Some(user) = self.users.get(&user_id).cloned() {
678 return Some(user);
679 }
680
681 self.get_user(user_id, cx).detach_and_log_err(cx);
682 None
683 }
684
685 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
686 if let Some(user) = self.users.get(&user_id).cloned() {
687 return Task::ready(Ok(user));
688 }
689
690 let load_users = self.get_users(vec![user_id], cx);
691 cx.spawn(async move |this, cx| {
692 load_users.await?;
693 this.update(cx, |this, _| {
694 this.users
695 .get(&user_id)
696 .cloned()
697 .ok_or_else(|| anyhow!("server responded with no users"))
698 })?
699 })
700 }
701
702 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
703 self.by_github_login
704 .get(github_login)
705 .and_then(|id| self.users.get(id).cloned())
706 }
707
708 pub fn current_user(&self) -> Option<Arc<User>> {
709 self.current_user.borrow().clone()
710 }
711
712 pub fn current_plan(&self) -> Option<proto::Plan> {
713 self.current_plan
714 }
715
716 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
717 self.trial_started_at
718 }
719
720 pub fn usage_based_billing_enabled(&self) -> Option<bool> {
721 self.is_usage_based_billing_enabled
722 }
723
724 pub fn model_request_usage_amount(&self) -> Option<u32> {
725 self.model_request_usage_amount
726 }
727
728 pub fn model_request_usage_limit(&self) -> Option<proto::UsageLimit> {
729 self.model_request_usage_limit.clone()
730 }
731
732 pub fn edit_predictions_usage_amount(&self) -> Option<u32> {
733 self.edit_predictions_usage_amount
734 }
735
736 pub fn edit_predictions_usage_limit(&self) -> Option<proto::UsageLimit> {
737 self.edit_predictions_usage_limit.clone()
738 }
739
740 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
741 self.current_user.clone()
742 }
743
744 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
745 self.accepted_tos_at
746 .map(|accepted_tos_at| accepted_tos_at.is_some())
747 }
748
749 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
750 if self.current_user().is_none() {
751 return Task::ready(Err(anyhow!("no current user")));
752 };
753
754 let client = self.client.clone();
755 cx.spawn(async move |this, cx| {
756 if let Some(client) = client.upgrade() {
757 let response = client
758 .request(proto::AcceptTermsOfService {})
759 .await
760 .context("error accepting tos")?;
761
762 this.update(cx, |this, cx| {
763 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
764 cx.emit(Event::PrivateUserInfoUpdated);
765 })
766 } else {
767 Err(anyhow!("client not found"))
768 }
769 })
770 }
771
772 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
773 self.accepted_tos_at = Some(
774 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
775 );
776 }
777
778 fn load_users(
779 &self,
780 request: impl RequestMessage<Response = UsersResponse>,
781 cx: &Context<Self>,
782 ) -> Task<Result<Vec<Arc<User>>>> {
783 let client = self.client.clone();
784 cx.spawn(async move |this, cx| {
785 if let Some(rpc) = client.upgrade() {
786 let response = rpc.request(request).await.context("error loading users")?;
787 let users = response.users;
788
789 this.update(cx, |this, _| this.insert(users))
790 } else {
791 Ok(Vec::new())
792 }
793 })
794 }
795
796 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
797 let mut ret = Vec::with_capacity(users.len());
798 for user in users {
799 let user = User::new(user);
800 if let Some(old) = self.users.insert(user.id, user.clone()) {
801 if old.github_login != user.github_login {
802 self.by_github_login.remove(&old.github_login);
803 }
804 }
805 self.by_github_login
806 .insert(user.github_login.clone(), user.id);
807 ret.push(user)
808 }
809 ret
810 }
811
812 pub fn set_participant_indices(
813 &mut self,
814 participant_indices: HashMap<u64, ParticipantIndex>,
815 cx: &mut Context<Self>,
816 ) {
817 if participant_indices != self.participant_indices {
818 self.participant_indices = participant_indices;
819 cx.emit(Event::ParticipantIndicesChanged);
820 }
821 }
822
823 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
824 &self.participant_indices
825 }
826
827 pub fn participant_names(
828 &self,
829 user_ids: impl Iterator<Item = u64>,
830 cx: &App,
831 ) -> HashMap<u64, SharedString> {
832 let mut ret = HashMap::default();
833 let mut missing_user_ids = Vec::new();
834 for id in user_ids {
835 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
836 ret.insert(id, github_login.into());
837 } else {
838 missing_user_ids.push(id)
839 }
840 }
841 if !missing_user_ids.is_empty() {
842 let this = self.weak_self.clone();
843 cx.spawn(async move |cx| {
844 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
845 .await
846 })
847 .detach_and_log_err(cx);
848 }
849 ret
850 }
851}
852
853impl User {
854 fn new(message: proto::User) -> Arc<Self> {
855 Arc::new(User {
856 id: message.id,
857 github_login: message.github_login,
858 avatar_uri: message.avatar_url.into(),
859 name: message.name,
860 email: message.email,
861 })
862 }
863}
864
865impl Contact {
866 async fn from_proto(
867 contact: proto::Contact,
868 user_store: &Entity<UserStore>,
869 cx: &mut AsyncApp,
870 ) -> Result<Self> {
871 let user = user_store
872 .update(cx, |user_store, cx| {
873 user_store.get_user(contact.user_id, cx)
874 })?
875 .await?;
876 Ok(Self {
877 user,
878 online: contact.online,
879 busy: contact.busy,
880 })
881 }
882}
883
884impl Collaborator {
885 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
886 Ok(Self {
887 peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
888 replica_id: message.replica_id as ReplicaId,
889 user_id: message.user_id as UserId,
890 is_host: message.is_host,
891 })
892 }
893}