1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use collections::{HashMap, HashSet, hash_map::Entry};
5use feature_flags::FeatureFlagAppExt;
6use futures::{Future, StreamExt, channel::mpsc};
7use gpui::{
8 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::{TryFutureExt as _, maybe};
15
16pub type UserId = u64;
17
18#[derive(
19 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
20)]
21pub struct ChannelId(pub u64);
22
23impl std::fmt::Display for ChannelId {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 self.0.fmt(f)
26 }
27}
28
29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
30pub struct ProjectId(pub u64);
31
32impl ProjectId {
33 pub fn to_proto(&self) -> u64 {
34 self.0
35 }
36}
37
38#[derive(
39 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
40)]
41pub struct DevServerProjectId(pub u64);
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub struct ParticipantIndex(pub u32);
45
46#[derive(Default, Debug)]
47pub struct User {
48 pub id: UserId,
49 pub github_login: String,
50 pub avatar_uri: SharedUri,
51 pub name: Option<String>,
52}
53
54#[derive(Clone, Debug, PartialEq, Eq)]
55pub struct Collaborator {
56 pub peer_id: proto::PeerId,
57 pub replica_id: ReplicaId,
58 pub user_id: UserId,
59 pub is_host: bool,
60 pub committer_name: Option<String>,
61 pub committer_email: Option<String>,
62}
63
64impl PartialOrd for User {
65 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
66 Some(self.cmp(other))
67 }
68}
69
70impl Ord for User {
71 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
72 self.github_login.cmp(&other.github_login)
73 }
74}
75
76impl PartialEq for User {
77 fn eq(&self, other: &Self) -> bool {
78 self.id == other.id && self.github_login == other.github_login
79 }
80}
81
82impl Eq for User {}
83
84#[derive(Debug, PartialEq)]
85pub struct Contact {
86 pub user: Arc<User>,
87 pub online: bool,
88 pub busy: bool,
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum ContactRequestStatus {
93 None,
94 RequestSent,
95 RequestReceived,
96 RequestAccepted,
97}
98
99pub struct UserStore {
100 users: HashMap<u64, Arc<User>>,
101 by_github_login: HashMap<String, u64>,
102 participant_indices: HashMap<u64, ParticipantIndex>,
103 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
104 current_plan: Option<proto::Plan>,
105 subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
106 trial_started_at: Option<DateTime<Utc>>,
107 model_request_usage_amount: Option<u32>,
108 model_request_usage_limit: Option<proto::UsageLimit>,
109 edit_predictions_usage_amount: Option<u32>,
110 edit_predictions_usage_limit: Option<proto::UsageLimit>,
111 is_usage_based_billing_enabled: Option<bool>,
112 account_too_young: Option<bool>,
113 has_overdue_invoices: Option<bool>,
114 current_user: watch::Receiver<Option<Arc<User>>>,
115 accepted_tos_at: Option<Option<DateTime<Utc>>>,
116 contacts: Vec<Arc<Contact>>,
117 incoming_contact_requests: Vec<Arc<User>>,
118 outgoing_contact_requests: Vec<Arc<User>>,
119 pending_contact_requests: HashMap<u64, usize>,
120 invite_info: Option<InviteInfo>,
121 client: Weak<Client>,
122 _maintain_contacts: Task<()>,
123 _maintain_current_user: Task<Result<()>>,
124 weak_self: WeakEntity<Self>,
125}
126
127#[derive(Clone)]
128pub struct InviteInfo {
129 pub count: u32,
130 pub url: Arc<str>,
131}
132
133pub enum Event {
134 Contact {
135 user: Arc<User>,
136 kind: ContactEventKind,
137 },
138 ShowContacts,
139 ParticipantIndicesChanged,
140 PrivateUserInfoUpdated,
141}
142
143#[derive(Clone, Copy)]
144pub enum ContactEventKind {
145 Requested,
146 Accepted,
147 Cancelled,
148}
149
150impl EventEmitter<Event> for UserStore {}
151
152enum UpdateContacts {
153 Update(proto::UpdateContacts),
154 Wait(postage::barrier::Sender),
155 Clear(postage::barrier::Sender),
156}
157
158impl UserStore {
159 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
160 let (mut current_user_tx, current_user_rx) = watch::channel();
161 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
162 let rpc_subscriptions = vec![
163 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
164 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
165 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
166 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
167 ];
168 Self {
169 users: Default::default(),
170 by_github_login: Default::default(),
171 current_user: current_user_rx,
172 current_plan: None,
173 subscription_period: None,
174 trial_started_at: None,
175 model_request_usage_amount: None,
176 model_request_usage_limit: None,
177 edit_predictions_usage_amount: None,
178 edit_predictions_usage_limit: None,
179 is_usage_based_billing_enabled: None,
180 account_too_young: None,
181 has_overdue_invoices: None,
182 accepted_tos_at: None,
183 contacts: Default::default(),
184 incoming_contact_requests: Default::default(),
185 participant_indices: Default::default(),
186 outgoing_contact_requests: Default::default(),
187 invite_info: None,
188 client: Arc::downgrade(&client),
189 update_contacts_tx,
190 _maintain_contacts: cx.spawn(async move |this, cx| {
191 let _subscriptions = rpc_subscriptions;
192 while let Some(message) = update_contacts_rx.next().await {
193 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
194 {
195 task.log_err().await;
196 } else {
197 break;
198 }
199 }
200 }),
201 _maintain_current_user: cx.spawn(async move |this, cx| {
202 let mut status = client.status();
203 let weak = Arc::downgrade(&client);
204 drop(client);
205 while let Some(status) = status.next().await {
206 // if the client is dropped, the app is shutting down.
207 let Some(client) = weak.upgrade() else {
208 return Ok(());
209 };
210 match status {
211 Status::Connected { .. } => {
212 if let Some(user_id) = client.user_id() {
213 let fetch_user = if let Ok(fetch_user) =
214 this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
215 {
216 fetch_user
217 } else {
218 break;
219 };
220 let fetch_private_user_info =
221 client.request(proto::GetPrivateUserInfo {}).log_err();
222 let (user, info) =
223 futures::join!(fetch_user, fetch_private_user_info);
224
225 cx.update(|cx| {
226 if let Some(info) = info {
227 let staff =
228 info.staff && !*feature_flags::ZED_DISABLE_STAFF;
229 cx.update_flags(staff, info.flags);
230 client.telemetry.set_authenticated_user_info(
231 Some(info.metrics_id.clone()),
232 staff,
233 );
234
235 this.update(cx, |this, cx| {
236 let accepted_tos_at = {
237 #[cfg(debug_assertions)]
238 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
239 {
240 None
241 } else {
242 info.accepted_tos_at
243 }
244
245 #[cfg(not(debug_assertions))]
246 info.accepted_tos_at
247 };
248
249 this.set_current_user_accepted_tos_at(accepted_tos_at);
250 cx.emit(Event::PrivateUserInfoUpdated);
251 })
252 } else {
253 anyhow::Ok(())
254 }
255 })??;
256
257 current_user_tx.send(user).await.ok();
258
259 this.update(cx, |_, cx| cx.notify())?;
260 }
261 }
262 Status::SignedOut => {
263 current_user_tx.send(None).await.ok();
264 this.update(cx, |this, cx| {
265 this.accepted_tos_at = None;
266 cx.emit(Event::PrivateUserInfoUpdated);
267 cx.notify();
268 this.clear_contacts()
269 })?
270 .await;
271 }
272 Status::ConnectionLost => {
273 this.update(cx, |this, cx| {
274 cx.notify();
275 this.clear_contacts()
276 })?
277 .await;
278 }
279 _ => {}
280 }
281 }
282 Ok(())
283 }),
284 pending_contact_requests: Default::default(),
285 weak_self: cx.weak_entity(),
286 }
287 }
288
289 #[cfg(feature = "test-support")]
290 pub fn clear_cache(&mut self) {
291 self.users.clear();
292 self.by_github_login.clear();
293 }
294
295 async fn handle_update_invite_info(
296 this: Entity<Self>,
297 message: TypedEnvelope<proto::UpdateInviteInfo>,
298 mut cx: AsyncApp,
299 ) -> Result<()> {
300 this.update(&mut cx, |this, cx| {
301 this.invite_info = Some(InviteInfo {
302 url: Arc::from(message.payload.url),
303 count: message.payload.count,
304 });
305 cx.notify();
306 })?;
307 Ok(())
308 }
309
310 async fn handle_show_contacts(
311 this: Entity<Self>,
312 _: TypedEnvelope<proto::ShowContacts>,
313 mut cx: AsyncApp,
314 ) -> Result<()> {
315 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
316 Ok(())
317 }
318
319 pub fn invite_info(&self) -> Option<&InviteInfo> {
320 self.invite_info.as_ref()
321 }
322
323 async fn handle_update_contacts(
324 this: Entity<Self>,
325 message: TypedEnvelope<proto::UpdateContacts>,
326 mut cx: AsyncApp,
327 ) -> Result<()> {
328 this.read_with(&mut cx, |this, _| {
329 this.update_contacts_tx
330 .unbounded_send(UpdateContacts::Update(message.payload))
331 .unwrap();
332 })?;
333 Ok(())
334 }
335
336 async fn handle_update_plan(
337 this: Entity<Self>,
338 message: TypedEnvelope<proto::UpdateUserPlan>,
339 mut cx: AsyncApp,
340 ) -> Result<()> {
341 this.update(&mut cx, |this, cx| {
342 this.current_plan = Some(message.payload.plan());
343 this.subscription_period = maybe!({
344 let period = message.payload.subscription_period?;
345 let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
346 let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
347
348 Some((started_at, ended_at))
349 });
350 this.trial_started_at = message
351 .payload
352 .trial_started_at
353 .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
354 this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
355 this.account_too_young = message.payload.account_too_young;
356 this.has_overdue_invoices = message.payload.has_overdue_invoices;
357
358 if let Some(usage) = message.payload.usage {
359 this.model_request_usage_amount = Some(usage.model_requests_usage_amount);
360 this.model_request_usage_limit = usage.model_requests_usage_limit;
361 this.edit_predictions_usage_amount = Some(usage.edit_predictions_usage_amount);
362 this.edit_predictions_usage_limit = usage.edit_predictions_usage_limit;
363 }
364
365 cx.notify();
366 })?;
367 Ok(())
368 }
369
370 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
371 match message {
372 UpdateContacts::Wait(barrier) => {
373 drop(barrier);
374 Task::ready(Ok(()))
375 }
376 UpdateContacts::Clear(barrier) => {
377 self.contacts.clear();
378 self.incoming_contact_requests.clear();
379 self.outgoing_contact_requests.clear();
380 drop(barrier);
381 Task::ready(Ok(()))
382 }
383 UpdateContacts::Update(message) => {
384 let mut user_ids = HashSet::default();
385 for contact in &message.contacts {
386 user_ids.insert(contact.user_id);
387 }
388 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
389 user_ids.extend(message.outgoing_requests.iter());
390
391 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
392 cx.spawn(async move |this, cx| {
393 load_users.await?;
394
395 // Users are fetched in parallel above and cached in call to get_users
396 // No need to parallelize here
397 let mut updated_contacts = Vec::new();
398 let this = this.upgrade().context("can't upgrade user store handle")?;
399 for contact in message.contacts {
400 updated_contacts
401 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
402 }
403
404 let mut incoming_requests = Vec::new();
405 for request in message.incoming_requests {
406 incoming_requests.push({
407 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
408 .await?
409 });
410 }
411
412 let mut outgoing_requests = Vec::new();
413 for requested_user_id in message.outgoing_requests {
414 outgoing_requests.push(
415 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
416 .await?,
417 );
418 }
419
420 let removed_contacts =
421 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
422 let removed_incoming_requests =
423 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
424 let removed_outgoing_requests =
425 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
426
427 this.update(cx, |this, cx| {
428 // Remove contacts
429 this.contacts
430 .retain(|contact| !removed_contacts.contains(&contact.user.id));
431 // Update existing contacts and insert new ones
432 for updated_contact in updated_contacts {
433 match this.contacts.binary_search_by_key(
434 &&updated_contact.user.github_login,
435 |contact| &contact.user.github_login,
436 ) {
437 Ok(ix) => this.contacts[ix] = updated_contact,
438 Err(ix) => this.contacts.insert(ix, updated_contact),
439 }
440 }
441
442 // Remove incoming contact requests
443 this.incoming_contact_requests.retain(|user| {
444 if removed_incoming_requests.contains(&user.id) {
445 cx.emit(Event::Contact {
446 user: user.clone(),
447 kind: ContactEventKind::Cancelled,
448 });
449 false
450 } else {
451 true
452 }
453 });
454 // Update existing incoming requests and insert new ones
455 for user in incoming_requests {
456 match this
457 .incoming_contact_requests
458 .binary_search_by_key(&&user.github_login, |contact| {
459 &contact.github_login
460 }) {
461 Ok(ix) => this.incoming_contact_requests[ix] = user,
462 Err(ix) => this.incoming_contact_requests.insert(ix, user),
463 }
464 }
465
466 // Remove outgoing contact requests
467 this.outgoing_contact_requests
468 .retain(|user| !removed_outgoing_requests.contains(&user.id));
469 // Update existing incoming requests and insert new ones
470 for request in outgoing_requests {
471 match this
472 .outgoing_contact_requests
473 .binary_search_by_key(&&request.github_login, |contact| {
474 &contact.github_login
475 }) {
476 Ok(ix) => this.outgoing_contact_requests[ix] = request,
477 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
478 }
479 }
480
481 cx.notify();
482 })?;
483
484 Ok(())
485 })
486 }
487 }
488 }
489
490 pub fn contacts(&self) -> &[Arc<Contact>] {
491 &self.contacts
492 }
493
494 pub fn has_contact(&self, user: &Arc<User>) -> bool {
495 self.contacts
496 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
497 .is_ok()
498 }
499
500 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
501 &self.incoming_contact_requests
502 }
503
504 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
505 &self.outgoing_contact_requests
506 }
507
508 pub fn is_contact_request_pending(&self, user: &User) -> bool {
509 self.pending_contact_requests.contains_key(&user.id)
510 }
511
512 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
513 if self
514 .contacts
515 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
516 .is_ok()
517 {
518 ContactRequestStatus::RequestAccepted
519 } else if self
520 .outgoing_contact_requests
521 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
522 .is_ok()
523 {
524 ContactRequestStatus::RequestSent
525 } else if self
526 .incoming_contact_requests
527 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
528 .is_ok()
529 {
530 ContactRequestStatus::RequestReceived
531 } else {
532 ContactRequestStatus::None
533 }
534 }
535
536 pub fn request_contact(
537 &mut self,
538 responder_id: u64,
539 cx: &mut Context<Self>,
540 ) -> Task<Result<()>> {
541 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
542 }
543
544 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
545 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
546 }
547
548 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
549 self.incoming_contact_requests
550 .iter()
551 .any(|user| user.id == user_id)
552 }
553
554 pub fn respond_to_contact_request(
555 &mut self,
556 requester_id: u64,
557 accept: bool,
558 cx: &mut Context<Self>,
559 ) -> Task<Result<()>> {
560 self.perform_contact_request(
561 requester_id,
562 proto::RespondToContactRequest {
563 requester_id,
564 response: if accept {
565 proto::ContactRequestResponse::Accept
566 } else {
567 proto::ContactRequestResponse::Decline
568 } as i32,
569 },
570 cx,
571 )
572 }
573
574 pub fn dismiss_contact_request(
575 &self,
576 requester_id: u64,
577 cx: &Context<Self>,
578 ) -> Task<Result<()>> {
579 let client = self.client.upgrade();
580 cx.spawn(async move |_, _| {
581 client
582 .context("can't upgrade client reference")?
583 .request(proto::RespondToContactRequest {
584 requester_id,
585 response: proto::ContactRequestResponse::Dismiss as i32,
586 })
587 .await?;
588 Ok(())
589 })
590 }
591
592 fn perform_contact_request<T: RequestMessage>(
593 &mut self,
594 user_id: u64,
595 request: T,
596 cx: &mut Context<Self>,
597 ) -> Task<Result<()>> {
598 let client = self.client.upgrade();
599 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
600 cx.notify();
601
602 cx.spawn(async move |this, cx| {
603 let response = client
604 .context("can't upgrade client reference")?
605 .request(request)
606 .await;
607 this.update(cx, |this, cx| {
608 if let Entry::Occupied(mut request_count) =
609 this.pending_contact_requests.entry(user_id)
610 {
611 *request_count.get_mut() -= 1;
612 if *request_count.get() == 0 {
613 request_count.remove();
614 }
615 }
616 cx.notify();
617 })?;
618 response?;
619 Ok(())
620 })
621 }
622
623 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
624 let (tx, mut rx) = postage::barrier::channel();
625 self.update_contacts_tx
626 .unbounded_send(UpdateContacts::Clear(tx))
627 .unwrap();
628 async move {
629 rx.next().await;
630 }
631 }
632
633 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
634 let (tx, mut rx) = postage::barrier::channel();
635 self.update_contacts_tx
636 .unbounded_send(UpdateContacts::Wait(tx))
637 .unwrap();
638 async move {
639 rx.next().await;
640 }
641 }
642
643 pub fn get_users(
644 &self,
645 user_ids: Vec<u64>,
646 cx: &Context<Self>,
647 ) -> Task<Result<Vec<Arc<User>>>> {
648 let mut user_ids_to_fetch = user_ids.clone();
649 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
650
651 cx.spawn(async move |this, cx| {
652 if !user_ids_to_fetch.is_empty() {
653 this.update(cx, |this, cx| {
654 this.load_users(
655 proto::GetUsers {
656 user_ids: user_ids_to_fetch,
657 },
658 cx,
659 )
660 })?
661 .await?;
662 }
663
664 this.read_with(cx, |this, _| {
665 user_ids
666 .iter()
667 .map(|user_id| {
668 this.users
669 .get(user_id)
670 .cloned()
671 .with_context(|| format!("user {user_id} not found"))
672 })
673 .collect()
674 })?
675 })
676 }
677
678 pub fn fuzzy_search_users(
679 &self,
680 query: String,
681 cx: &Context<Self>,
682 ) -> Task<Result<Vec<Arc<User>>>> {
683 self.load_users(proto::FuzzySearchUsers { query }, cx)
684 }
685
686 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
687 self.users.get(&user_id).cloned()
688 }
689
690 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
691 if let Some(user) = self.users.get(&user_id).cloned() {
692 return Some(user);
693 }
694
695 self.get_user(user_id, cx).detach_and_log_err(cx);
696 None
697 }
698
699 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
700 if let Some(user) = self.users.get(&user_id).cloned() {
701 return Task::ready(Ok(user));
702 }
703
704 let load_users = self.get_users(vec![user_id], cx);
705 cx.spawn(async move |this, cx| {
706 load_users.await?;
707 this.read_with(cx, |this, _| {
708 this.users
709 .get(&user_id)
710 .cloned()
711 .context("server responded with no users")
712 })?
713 })
714 }
715
716 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
717 self.by_github_login
718 .get(github_login)
719 .and_then(|id| self.users.get(id).cloned())
720 }
721
722 pub fn current_user(&self) -> Option<Arc<User>> {
723 self.current_user.borrow().clone()
724 }
725
726 pub fn current_plan(&self) -> Option<proto::Plan> {
727 self.current_plan
728 }
729
730 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
731 self.subscription_period
732 }
733
734 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
735 self.trial_started_at
736 }
737
738 pub fn usage_based_billing_enabled(&self) -> Option<bool> {
739 self.is_usage_based_billing_enabled
740 }
741
742 pub fn model_request_usage_amount(&self) -> Option<u32> {
743 self.model_request_usage_amount
744 }
745
746 pub fn model_request_usage_limit(&self) -> Option<proto::UsageLimit> {
747 self.model_request_usage_limit.clone()
748 }
749
750 pub fn edit_predictions_usage_amount(&self) -> Option<u32> {
751 self.edit_predictions_usage_amount
752 }
753
754 pub fn edit_predictions_usage_limit(&self) -> Option<proto::UsageLimit> {
755 self.edit_predictions_usage_limit.clone()
756 }
757
758 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
759 self.current_user.clone()
760 }
761
762 /// Returns whether the user's account is too new to use the service.
763 pub fn account_too_young(&self) -> bool {
764 self.account_too_young.unwrap_or(false)
765 }
766
767 /// Returns whether the current user has overdue invoices and usage should be blocked.
768 pub fn has_overdue_invoices(&self) -> bool {
769 self.has_overdue_invoices.unwrap_or(false)
770 }
771
772 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
773 self.accepted_tos_at
774 .map(|accepted_tos_at| accepted_tos_at.is_some())
775 }
776
777 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
778 if self.current_user().is_none() {
779 return Task::ready(Err(anyhow!("no current user")));
780 };
781
782 let client = self.client.clone();
783 cx.spawn(async move |this, cx| -> anyhow::Result<()> {
784 let client = client.upgrade().context("client not found")?;
785 let response = client
786 .request(proto::AcceptTermsOfService {})
787 .await
788 .context("error accepting tos")?;
789 this.update(cx, |this, cx| {
790 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
791 cx.emit(Event::PrivateUserInfoUpdated);
792 })?;
793 Ok(())
794 })
795 }
796
797 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
798 self.accepted_tos_at = Some(
799 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
800 );
801 }
802
803 fn load_users(
804 &self,
805 request: impl RequestMessage<Response = UsersResponse>,
806 cx: &Context<Self>,
807 ) -> Task<Result<Vec<Arc<User>>>> {
808 let client = self.client.clone();
809 cx.spawn(async move |this, cx| {
810 if let Some(rpc) = client.upgrade() {
811 let response = rpc.request(request).await.context("error loading users")?;
812 let users = response.users;
813
814 this.update(cx, |this, _| this.insert(users))
815 } else {
816 Ok(Vec::new())
817 }
818 })
819 }
820
821 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
822 let mut ret = Vec::with_capacity(users.len());
823 for user in users {
824 let user = User::new(user);
825 if let Some(old) = self.users.insert(user.id, user.clone()) {
826 if old.github_login != user.github_login {
827 self.by_github_login.remove(&old.github_login);
828 }
829 }
830 self.by_github_login
831 .insert(user.github_login.clone(), user.id);
832 ret.push(user)
833 }
834 ret
835 }
836
837 pub fn set_participant_indices(
838 &mut self,
839 participant_indices: HashMap<u64, ParticipantIndex>,
840 cx: &mut Context<Self>,
841 ) {
842 if participant_indices != self.participant_indices {
843 self.participant_indices = participant_indices;
844 cx.emit(Event::ParticipantIndicesChanged);
845 }
846 }
847
848 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
849 &self.participant_indices
850 }
851
852 pub fn participant_names(
853 &self,
854 user_ids: impl Iterator<Item = u64>,
855 cx: &App,
856 ) -> HashMap<u64, SharedString> {
857 let mut ret = HashMap::default();
858 let mut missing_user_ids = Vec::new();
859 for id in user_ids {
860 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
861 ret.insert(id, github_login.into());
862 } else {
863 missing_user_ids.push(id)
864 }
865 }
866 if !missing_user_ids.is_empty() {
867 let this = self.weak_self.clone();
868 cx.spawn(async move |cx| {
869 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
870 .await
871 })
872 .detach_and_log_err(cx);
873 }
874 ret
875 }
876}
877
878impl User {
879 fn new(message: proto::User) -> Arc<Self> {
880 Arc::new(User {
881 id: message.id,
882 github_login: message.github_login,
883 avatar_uri: message.avatar_url.into(),
884 name: message.name,
885 })
886 }
887}
888
889impl Contact {
890 async fn from_proto(
891 contact: proto::Contact,
892 user_store: &Entity<UserStore>,
893 cx: &mut AsyncApp,
894 ) -> Result<Self> {
895 let user = user_store
896 .update(cx, |user_store, cx| {
897 user_store.get_user(contact.user_id, cx)
898 })?
899 .await?;
900 Ok(Self {
901 user,
902 online: contact.online,
903 busy: contact.busy,
904 })
905 }
906}
907
908impl Collaborator {
909 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
910 Ok(Self {
911 peer_id: message.peer_id.context("invalid peer id")?,
912 replica_id: message.replica_id as ReplicaId,
913 user_id: message.user_id as UserId,
914 is_host: message.is_host,
915 committer_name: message.committer_name,
916 committer_email: message.committer_email,
917 })
918 }
919}