1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use collections::{HashMap, HashSet, hash_map::Entry};
5use feature_flags::FeatureFlagAppExt;
6use futures::{Future, StreamExt, channel::mpsc};
7use gpui::{
8 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::{TryFutureExt as _, maybe};
15
16pub type UserId = u64;
17
18#[derive(
19 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
20)]
21pub struct ChannelId(pub u64);
22
23impl std::fmt::Display for ChannelId {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 self.0.fmt(f)
26 }
27}
28
29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
30pub struct ProjectId(pub u64);
31
32impl ProjectId {
33 pub fn to_proto(&self) -> u64 {
34 self.0
35 }
36}
37
38#[derive(
39 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
40)]
41pub struct DevServerProjectId(pub u64);
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub struct ParticipantIndex(pub u32);
45
46#[derive(Default, Debug)]
47pub struct User {
48 pub id: UserId,
49 pub github_login: String,
50 pub avatar_uri: SharedUri,
51 pub name: Option<String>,
52 pub email: Option<String>,
53}
54
55#[derive(Clone, Debug, PartialEq, Eq)]
56pub struct Collaborator {
57 pub peer_id: proto::PeerId,
58 pub replica_id: ReplicaId,
59 pub user_id: UserId,
60 pub is_host: bool,
61}
62
63impl PartialOrd for User {
64 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
65 Some(self.cmp(other))
66 }
67}
68
69impl Ord for User {
70 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
71 self.github_login.cmp(&other.github_login)
72 }
73}
74
75impl PartialEq for User {
76 fn eq(&self, other: &Self) -> bool {
77 self.id == other.id && self.github_login == other.github_login
78 }
79}
80
81impl Eq for User {}
82
83#[derive(Debug, PartialEq)]
84pub struct Contact {
85 pub user: Arc<User>,
86 pub online: bool,
87 pub busy: bool,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum ContactRequestStatus {
92 None,
93 RequestSent,
94 RequestReceived,
95 RequestAccepted,
96}
97
98pub struct UserStore {
99 users: HashMap<u64, Arc<User>>,
100 by_github_login: HashMap<String, u64>,
101 participant_indices: HashMap<u64, ParticipantIndex>,
102 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
103 current_plan: Option<proto::Plan>,
104 subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
105 trial_started_at: Option<DateTime<Utc>>,
106 model_request_usage_amount: Option<u32>,
107 model_request_usage_limit: Option<proto::UsageLimit>,
108 edit_predictions_usage_amount: Option<u32>,
109 edit_predictions_usage_limit: Option<proto::UsageLimit>,
110 is_usage_based_billing_enabled: Option<bool>,
111 account_too_young: Option<bool>,
112 current_user: watch::Receiver<Option<Arc<User>>>,
113 accepted_tos_at: Option<Option<DateTime<Utc>>>,
114 contacts: Vec<Arc<Contact>>,
115 incoming_contact_requests: Vec<Arc<User>>,
116 outgoing_contact_requests: Vec<Arc<User>>,
117 pending_contact_requests: HashMap<u64, usize>,
118 invite_info: Option<InviteInfo>,
119 client: Weak<Client>,
120 _maintain_contacts: Task<()>,
121 _maintain_current_user: Task<Result<()>>,
122 weak_self: WeakEntity<Self>,
123}
124
125#[derive(Clone)]
126pub struct InviteInfo {
127 pub count: u32,
128 pub url: Arc<str>,
129}
130
131pub enum Event {
132 Contact {
133 user: Arc<User>,
134 kind: ContactEventKind,
135 },
136 ShowContacts,
137 ParticipantIndicesChanged,
138 PrivateUserInfoUpdated,
139}
140
141#[derive(Clone, Copy)]
142pub enum ContactEventKind {
143 Requested,
144 Accepted,
145 Cancelled,
146}
147
148impl EventEmitter<Event> for UserStore {}
149
150enum UpdateContacts {
151 Update(proto::UpdateContacts),
152 Wait(postage::barrier::Sender),
153 Clear(postage::barrier::Sender),
154}
155
156impl UserStore {
157 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
158 let (mut current_user_tx, current_user_rx) = watch::channel();
159 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
160 let rpc_subscriptions = vec![
161 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
162 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
163 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
164 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
165 ];
166 Self {
167 users: Default::default(),
168 by_github_login: Default::default(),
169 current_user: current_user_rx,
170 current_plan: None,
171 subscription_period: None,
172 trial_started_at: None,
173 model_request_usage_amount: None,
174 model_request_usage_limit: None,
175 edit_predictions_usage_amount: None,
176 edit_predictions_usage_limit: None,
177 is_usage_based_billing_enabled: None,
178 account_too_young: None,
179 accepted_tos_at: None,
180 contacts: Default::default(),
181 incoming_contact_requests: Default::default(),
182 participant_indices: Default::default(),
183 outgoing_contact_requests: Default::default(),
184 invite_info: None,
185 client: Arc::downgrade(&client),
186 update_contacts_tx,
187 _maintain_contacts: cx.spawn(async move |this, cx| {
188 let _subscriptions = rpc_subscriptions;
189 while let Some(message) = update_contacts_rx.next().await {
190 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
191 {
192 task.log_err().await;
193 } else {
194 break;
195 }
196 }
197 }),
198 _maintain_current_user: cx.spawn(async move |this, cx| {
199 let mut status = client.status();
200 let weak = Arc::downgrade(&client);
201 drop(client);
202 while let Some(status) = status.next().await {
203 // if the client is dropped, the app is shutting down.
204 let Some(client) = weak.upgrade() else {
205 return Ok(());
206 };
207 match status {
208 Status::Connected { .. } => {
209 if let Some(user_id) = client.user_id() {
210 let fetch_user = if let Ok(fetch_user) =
211 this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
212 {
213 fetch_user
214 } else {
215 break;
216 };
217 let fetch_private_user_info =
218 client.request(proto::GetPrivateUserInfo {}).log_err();
219 let (user, info) =
220 futures::join!(fetch_user, fetch_private_user_info);
221
222 cx.update(|cx| {
223 if let Some(info) = info {
224 let staff =
225 info.staff && !*feature_flags::ZED_DISABLE_STAFF;
226 cx.update_flags(staff, info.flags);
227 client.telemetry.set_authenticated_user_info(
228 Some(info.metrics_id.clone()),
229 staff,
230 );
231
232 this.update(cx, |this, cx| {
233 let accepted_tos_at = {
234 #[cfg(debug_assertions)]
235 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
236 {
237 None
238 } else {
239 info.accepted_tos_at
240 }
241
242 #[cfg(not(debug_assertions))]
243 info.accepted_tos_at
244 };
245
246 this.set_current_user_accepted_tos_at(accepted_tos_at);
247 cx.emit(Event::PrivateUserInfoUpdated);
248 })
249 } else {
250 anyhow::Ok(())
251 }
252 })??;
253
254 current_user_tx.send(user).await.ok();
255
256 this.update(cx, |_, cx| cx.notify())?;
257 }
258 }
259 Status::SignedOut => {
260 current_user_tx.send(None).await.ok();
261 this.update(cx, |this, cx| {
262 this.accepted_tos_at = None;
263 cx.emit(Event::PrivateUserInfoUpdated);
264 cx.notify();
265 this.clear_contacts()
266 })?
267 .await;
268 }
269 Status::ConnectionLost => {
270 this.update(cx, |this, cx| {
271 cx.notify();
272 this.clear_contacts()
273 })?
274 .await;
275 }
276 _ => {}
277 }
278 }
279 Ok(())
280 }),
281 pending_contact_requests: Default::default(),
282 weak_self: cx.weak_entity(),
283 }
284 }
285
286 #[cfg(feature = "test-support")]
287 pub fn clear_cache(&mut self) {
288 self.users.clear();
289 self.by_github_login.clear();
290 }
291
292 async fn handle_update_invite_info(
293 this: Entity<Self>,
294 message: TypedEnvelope<proto::UpdateInviteInfo>,
295 mut cx: AsyncApp,
296 ) -> Result<()> {
297 this.update(&mut cx, |this, cx| {
298 this.invite_info = Some(InviteInfo {
299 url: Arc::from(message.payload.url),
300 count: message.payload.count,
301 });
302 cx.notify();
303 })?;
304 Ok(())
305 }
306
307 async fn handle_show_contacts(
308 this: Entity<Self>,
309 _: TypedEnvelope<proto::ShowContacts>,
310 mut cx: AsyncApp,
311 ) -> Result<()> {
312 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
313 Ok(())
314 }
315
316 pub fn invite_info(&self) -> Option<&InviteInfo> {
317 self.invite_info.as_ref()
318 }
319
320 async fn handle_update_contacts(
321 this: Entity<Self>,
322 message: TypedEnvelope<proto::UpdateContacts>,
323 mut cx: AsyncApp,
324 ) -> Result<()> {
325 this.update(&mut cx, |this, _| {
326 this.update_contacts_tx
327 .unbounded_send(UpdateContacts::Update(message.payload))
328 .unwrap();
329 })?;
330 Ok(())
331 }
332
333 async fn handle_update_plan(
334 this: Entity<Self>,
335 message: TypedEnvelope<proto::UpdateUserPlan>,
336 mut cx: AsyncApp,
337 ) -> Result<()> {
338 this.update(&mut cx, |this, cx| {
339 this.current_plan = Some(message.payload.plan());
340 this.subscription_period = maybe!({
341 let period = message.payload.subscription_period?;
342 let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
343 let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
344
345 Some((started_at, ended_at))
346 });
347 this.trial_started_at = message
348 .payload
349 .trial_started_at
350 .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
351 this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
352 this.account_too_young = message.payload.account_too_young;
353
354 if let Some(usage) = message.payload.usage {
355 this.model_request_usage_amount = Some(usage.model_requests_usage_amount);
356 this.model_request_usage_limit = usage.model_requests_usage_limit;
357 this.edit_predictions_usage_amount = Some(usage.edit_predictions_usage_amount);
358 this.edit_predictions_usage_limit = usage.edit_predictions_usage_limit;
359 }
360
361 cx.notify();
362 })?;
363 Ok(())
364 }
365
366 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
367 match message {
368 UpdateContacts::Wait(barrier) => {
369 drop(barrier);
370 Task::ready(Ok(()))
371 }
372 UpdateContacts::Clear(barrier) => {
373 self.contacts.clear();
374 self.incoming_contact_requests.clear();
375 self.outgoing_contact_requests.clear();
376 drop(barrier);
377 Task::ready(Ok(()))
378 }
379 UpdateContacts::Update(message) => {
380 let mut user_ids = HashSet::default();
381 for contact in &message.contacts {
382 user_ids.insert(contact.user_id);
383 }
384 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
385 user_ids.extend(message.outgoing_requests.iter());
386
387 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
388 cx.spawn(async move |this, cx| {
389 load_users.await?;
390
391 // Users are fetched in parallel above and cached in call to get_users
392 // No need to parallelize here
393 let mut updated_contacts = Vec::new();
394 let this = this.upgrade().context("can't upgrade user store handle")?;
395 for contact in message.contacts {
396 updated_contacts
397 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
398 }
399
400 let mut incoming_requests = Vec::new();
401 for request in message.incoming_requests {
402 incoming_requests.push({
403 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
404 .await?
405 });
406 }
407
408 let mut outgoing_requests = Vec::new();
409 for requested_user_id in message.outgoing_requests {
410 outgoing_requests.push(
411 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
412 .await?,
413 );
414 }
415
416 let removed_contacts =
417 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
418 let removed_incoming_requests =
419 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
420 let removed_outgoing_requests =
421 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
422
423 this.update(cx, |this, cx| {
424 // Remove contacts
425 this.contacts
426 .retain(|contact| !removed_contacts.contains(&contact.user.id));
427 // Update existing contacts and insert new ones
428 for updated_contact in updated_contacts {
429 match this.contacts.binary_search_by_key(
430 &&updated_contact.user.github_login,
431 |contact| &contact.user.github_login,
432 ) {
433 Ok(ix) => this.contacts[ix] = updated_contact,
434 Err(ix) => this.contacts.insert(ix, updated_contact),
435 }
436 }
437
438 // Remove incoming contact requests
439 this.incoming_contact_requests.retain(|user| {
440 if removed_incoming_requests.contains(&user.id) {
441 cx.emit(Event::Contact {
442 user: user.clone(),
443 kind: ContactEventKind::Cancelled,
444 });
445 false
446 } else {
447 true
448 }
449 });
450 // Update existing incoming requests and insert new ones
451 for user in incoming_requests {
452 match this
453 .incoming_contact_requests
454 .binary_search_by_key(&&user.github_login, |contact| {
455 &contact.github_login
456 }) {
457 Ok(ix) => this.incoming_contact_requests[ix] = user,
458 Err(ix) => this.incoming_contact_requests.insert(ix, user),
459 }
460 }
461
462 // Remove outgoing contact requests
463 this.outgoing_contact_requests
464 .retain(|user| !removed_outgoing_requests.contains(&user.id));
465 // Update existing incoming requests and insert new ones
466 for request in outgoing_requests {
467 match this
468 .outgoing_contact_requests
469 .binary_search_by_key(&&request.github_login, |contact| {
470 &contact.github_login
471 }) {
472 Ok(ix) => this.outgoing_contact_requests[ix] = request,
473 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
474 }
475 }
476
477 cx.notify();
478 })?;
479
480 Ok(())
481 })
482 }
483 }
484 }
485
486 pub fn contacts(&self) -> &[Arc<Contact>] {
487 &self.contacts
488 }
489
490 pub fn has_contact(&self, user: &Arc<User>) -> bool {
491 self.contacts
492 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
493 .is_ok()
494 }
495
496 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
497 &self.incoming_contact_requests
498 }
499
500 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
501 &self.outgoing_contact_requests
502 }
503
504 pub fn is_contact_request_pending(&self, user: &User) -> bool {
505 self.pending_contact_requests.contains_key(&user.id)
506 }
507
508 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
509 if self
510 .contacts
511 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
512 .is_ok()
513 {
514 ContactRequestStatus::RequestAccepted
515 } else if self
516 .outgoing_contact_requests
517 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
518 .is_ok()
519 {
520 ContactRequestStatus::RequestSent
521 } else if self
522 .incoming_contact_requests
523 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
524 .is_ok()
525 {
526 ContactRequestStatus::RequestReceived
527 } else {
528 ContactRequestStatus::None
529 }
530 }
531
532 pub fn request_contact(
533 &mut self,
534 responder_id: u64,
535 cx: &mut Context<Self>,
536 ) -> Task<Result<()>> {
537 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
538 }
539
540 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
541 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
542 }
543
544 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
545 self.incoming_contact_requests
546 .iter()
547 .any(|user| user.id == user_id)
548 }
549
550 pub fn respond_to_contact_request(
551 &mut self,
552 requester_id: u64,
553 accept: bool,
554 cx: &mut Context<Self>,
555 ) -> Task<Result<()>> {
556 self.perform_contact_request(
557 requester_id,
558 proto::RespondToContactRequest {
559 requester_id,
560 response: if accept {
561 proto::ContactRequestResponse::Accept
562 } else {
563 proto::ContactRequestResponse::Decline
564 } as i32,
565 },
566 cx,
567 )
568 }
569
570 pub fn dismiss_contact_request(
571 &self,
572 requester_id: u64,
573 cx: &Context<Self>,
574 ) -> Task<Result<()>> {
575 let client = self.client.upgrade();
576 cx.spawn(async move |_, _| {
577 client
578 .context("can't upgrade client reference")?
579 .request(proto::RespondToContactRequest {
580 requester_id,
581 response: proto::ContactRequestResponse::Dismiss as i32,
582 })
583 .await?;
584 Ok(())
585 })
586 }
587
588 fn perform_contact_request<T: RequestMessage>(
589 &mut self,
590 user_id: u64,
591 request: T,
592 cx: &mut Context<Self>,
593 ) -> Task<Result<()>> {
594 let client = self.client.upgrade();
595 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
596 cx.notify();
597
598 cx.spawn(async move |this, cx| {
599 let response = client
600 .context("can't upgrade client reference")?
601 .request(request)
602 .await;
603 this.update(cx, |this, cx| {
604 if let Entry::Occupied(mut request_count) =
605 this.pending_contact_requests.entry(user_id)
606 {
607 *request_count.get_mut() -= 1;
608 if *request_count.get() == 0 {
609 request_count.remove();
610 }
611 }
612 cx.notify();
613 })?;
614 response?;
615 Ok(())
616 })
617 }
618
619 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
620 let (tx, mut rx) = postage::barrier::channel();
621 self.update_contacts_tx
622 .unbounded_send(UpdateContacts::Clear(tx))
623 .unwrap();
624 async move {
625 rx.next().await;
626 }
627 }
628
629 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
630 let (tx, mut rx) = postage::barrier::channel();
631 self.update_contacts_tx
632 .unbounded_send(UpdateContacts::Wait(tx))
633 .unwrap();
634 async move {
635 rx.next().await;
636 }
637 }
638
639 pub fn get_users(
640 &self,
641 user_ids: Vec<u64>,
642 cx: &Context<Self>,
643 ) -> Task<Result<Vec<Arc<User>>>> {
644 let mut user_ids_to_fetch = user_ids.clone();
645 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
646
647 cx.spawn(async move |this, cx| {
648 if !user_ids_to_fetch.is_empty() {
649 this.update(cx, |this, cx| {
650 this.load_users(
651 proto::GetUsers {
652 user_ids: user_ids_to_fetch,
653 },
654 cx,
655 )
656 })?
657 .await?;
658 }
659
660 this.update(cx, |this, _| {
661 user_ids
662 .iter()
663 .map(|user_id| {
664 this.users
665 .get(user_id)
666 .cloned()
667 .with_context(|| format!("user {user_id} not found"))
668 })
669 .collect()
670 })?
671 })
672 }
673
674 pub fn fuzzy_search_users(
675 &self,
676 query: String,
677 cx: &Context<Self>,
678 ) -> Task<Result<Vec<Arc<User>>>> {
679 self.load_users(proto::FuzzySearchUsers { query }, cx)
680 }
681
682 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
683 self.users.get(&user_id).cloned()
684 }
685
686 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
687 if let Some(user) = self.users.get(&user_id).cloned() {
688 return Some(user);
689 }
690
691 self.get_user(user_id, cx).detach_and_log_err(cx);
692 None
693 }
694
695 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
696 if let Some(user) = self.users.get(&user_id).cloned() {
697 return Task::ready(Ok(user));
698 }
699
700 let load_users = self.get_users(vec![user_id], cx);
701 cx.spawn(async move |this, cx| {
702 load_users.await?;
703 this.update(cx, |this, _| {
704 this.users
705 .get(&user_id)
706 .cloned()
707 .context("server responded with no users")
708 })?
709 })
710 }
711
712 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
713 self.by_github_login
714 .get(github_login)
715 .and_then(|id| self.users.get(id).cloned())
716 }
717
718 pub fn current_user(&self) -> Option<Arc<User>> {
719 self.current_user.borrow().clone()
720 }
721
722 pub fn current_plan(&self) -> Option<proto::Plan> {
723 self.current_plan
724 }
725
726 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
727 self.subscription_period
728 }
729
730 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
731 self.trial_started_at
732 }
733
734 pub fn usage_based_billing_enabled(&self) -> Option<bool> {
735 self.is_usage_based_billing_enabled
736 }
737
738 pub fn model_request_usage_amount(&self) -> Option<u32> {
739 self.model_request_usage_amount
740 }
741
742 pub fn model_request_usage_limit(&self) -> Option<proto::UsageLimit> {
743 self.model_request_usage_limit.clone()
744 }
745
746 pub fn edit_predictions_usage_amount(&self) -> Option<u32> {
747 self.edit_predictions_usage_amount
748 }
749
750 pub fn edit_predictions_usage_limit(&self) -> Option<proto::UsageLimit> {
751 self.edit_predictions_usage_limit.clone()
752 }
753
754 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
755 self.current_user.clone()
756 }
757
758 /// Check if the current user's account is too new to use the service
759 pub fn current_user_account_too_young(&self) -> bool {
760 self.account_too_young.unwrap_or(false)
761 }
762
763 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
764 self.accepted_tos_at
765 .map(|accepted_tos_at| accepted_tos_at.is_some())
766 }
767
768 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
769 if self.current_user().is_none() {
770 return Task::ready(Err(anyhow!("no current user")));
771 };
772
773 let client = self.client.clone();
774 cx.spawn(async move |this, cx| -> anyhow::Result<()> {
775 let client = client.upgrade().context("client not found")?;
776 let response = client
777 .request(proto::AcceptTermsOfService {})
778 .await
779 .context("error accepting tos")?;
780 this.update(cx, |this, cx| {
781 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
782 cx.emit(Event::PrivateUserInfoUpdated);
783 })?;
784 Ok(())
785 })
786 }
787
788 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
789 self.accepted_tos_at = Some(
790 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
791 );
792 }
793
794 fn load_users(
795 &self,
796 request: impl RequestMessage<Response = UsersResponse>,
797 cx: &Context<Self>,
798 ) -> Task<Result<Vec<Arc<User>>>> {
799 let client = self.client.clone();
800 cx.spawn(async move |this, cx| {
801 if let Some(rpc) = client.upgrade() {
802 let response = rpc.request(request).await.context("error loading users")?;
803 let users = response.users;
804
805 this.update(cx, |this, _| this.insert(users))
806 } else {
807 Ok(Vec::new())
808 }
809 })
810 }
811
812 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
813 let mut ret = Vec::with_capacity(users.len());
814 for user in users {
815 let user = User::new(user);
816 if let Some(old) = self.users.insert(user.id, user.clone()) {
817 if old.github_login != user.github_login {
818 self.by_github_login.remove(&old.github_login);
819 }
820 }
821 self.by_github_login
822 .insert(user.github_login.clone(), user.id);
823 ret.push(user)
824 }
825 ret
826 }
827
828 pub fn set_participant_indices(
829 &mut self,
830 participant_indices: HashMap<u64, ParticipantIndex>,
831 cx: &mut Context<Self>,
832 ) {
833 if participant_indices != self.participant_indices {
834 self.participant_indices = participant_indices;
835 cx.emit(Event::ParticipantIndicesChanged);
836 }
837 }
838
839 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
840 &self.participant_indices
841 }
842
843 pub fn participant_names(
844 &self,
845 user_ids: impl Iterator<Item = u64>,
846 cx: &App,
847 ) -> HashMap<u64, SharedString> {
848 let mut ret = HashMap::default();
849 let mut missing_user_ids = Vec::new();
850 for id in user_ids {
851 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
852 ret.insert(id, github_login.into());
853 } else {
854 missing_user_ids.push(id)
855 }
856 }
857 if !missing_user_ids.is_empty() {
858 let this = self.weak_self.clone();
859 cx.spawn(async move |cx| {
860 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
861 .await
862 })
863 .detach_and_log_err(cx);
864 }
865 ret
866 }
867}
868
869impl User {
870 fn new(message: proto::User) -> Arc<Self> {
871 Arc::new(User {
872 id: message.id,
873 github_login: message.github_login,
874 avatar_uri: message.avatar_url.into(),
875 name: message.name,
876 email: message.email,
877 })
878 }
879}
880
881impl Contact {
882 async fn from_proto(
883 contact: proto::Contact,
884 user_store: &Entity<UserStore>,
885 cx: &mut AsyncApp,
886 ) -> Result<Self> {
887 let user = user_store
888 .update(cx, |user_store, cx| {
889 user_store.get_user(contact.user_id, cx)
890 })?
891 .await?;
892 Ok(Self {
893 user,
894 online: contact.online,
895 busy: contact.busy,
896 })
897 }
898}
899
900impl Collaborator {
901 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
902 Ok(Self {
903 peer_id: message.peer_id.context("invalid peer id")?,
904 replica_id: message.replica_id as ReplicaId,
905 user_id: message.user_id as UserId,
906 is_host: message.is_host,
907 })
908 }
909}