1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use collections::{HashMap, HashSet, hash_map::Entry};
5use feature_flags::FeatureFlagAppExt;
6use futures::{Future, StreamExt, channel::mpsc};
7use gpui::{
8 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
9};
10use postage::{sink::Sink, watch};
11use rpc::proto::{RequestMessage, UsersResponse};
12use std::sync::{Arc, Weak};
13use text::ReplicaId;
14use util::{TryFutureExt as _, maybe};
15
16pub type UserId = u64;
17
18#[derive(
19 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
20)]
21pub struct ChannelId(pub u64);
22
23impl std::fmt::Display for ChannelId {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 self.0.fmt(f)
26 }
27}
28
29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
30pub struct ProjectId(pub u64);
31
32impl ProjectId {
33 pub fn to_proto(&self) -> u64 {
34 self.0
35 }
36}
37
38#[derive(
39 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
40)]
41pub struct DevServerProjectId(pub u64);
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub struct ParticipantIndex(pub u32);
45
46#[derive(Default, Debug)]
47pub struct User {
48 pub id: UserId,
49 pub github_login: String,
50 pub avatar_uri: SharedUri,
51 pub name: Option<String>,
52 pub email: Option<String>,
53}
54
55#[derive(Clone, Debug, PartialEq, Eq)]
56pub struct Collaborator {
57 pub peer_id: proto::PeerId,
58 pub replica_id: ReplicaId,
59 pub user_id: UserId,
60 pub is_host: bool,
61}
62
63impl PartialOrd for User {
64 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
65 Some(self.cmp(other))
66 }
67}
68
69impl Ord for User {
70 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
71 self.github_login.cmp(&other.github_login)
72 }
73}
74
75impl PartialEq for User {
76 fn eq(&self, other: &Self) -> bool {
77 self.id == other.id && self.github_login == other.github_login
78 }
79}
80
81impl Eq for User {}
82
83#[derive(Debug, PartialEq)]
84pub struct Contact {
85 pub user: Arc<User>,
86 pub online: bool,
87 pub busy: bool,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum ContactRequestStatus {
92 None,
93 RequestSent,
94 RequestReceived,
95 RequestAccepted,
96}
97
98pub struct UserStore {
99 users: HashMap<u64, Arc<User>>,
100 by_github_login: HashMap<String, u64>,
101 participant_indices: HashMap<u64, ParticipantIndex>,
102 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
103 current_plan: Option<proto::Plan>,
104 subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
105 trial_started_at: Option<DateTime<Utc>>,
106 model_request_usage_amount: Option<u32>,
107 model_request_usage_limit: Option<proto::UsageLimit>,
108 edit_predictions_usage_amount: Option<u32>,
109 edit_predictions_usage_limit: Option<proto::UsageLimit>,
110 is_usage_based_billing_enabled: Option<bool>,
111 account_too_young: Option<bool>,
112 has_overdue_invoices: Option<bool>,
113 current_user: watch::Receiver<Option<Arc<User>>>,
114 accepted_tos_at: Option<Option<DateTime<Utc>>>,
115 contacts: Vec<Arc<Contact>>,
116 incoming_contact_requests: Vec<Arc<User>>,
117 outgoing_contact_requests: Vec<Arc<User>>,
118 pending_contact_requests: HashMap<u64, usize>,
119 invite_info: Option<InviteInfo>,
120 client: Weak<Client>,
121 _maintain_contacts: Task<()>,
122 _maintain_current_user: Task<Result<()>>,
123 weak_self: WeakEntity<Self>,
124}
125
126#[derive(Clone)]
127pub struct InviteInfo {
128 pub count: u32,
129 pub url: Arc<str>,
130}
131
132pub enum Event {
133 Contact {
134 user: Arc<User>,
135 kind: ContactEventKind,
136 },
137 ShowContacts,
138 ParticipantIndicesChanged,
139 PrivateUserInfoUpdated,
140}
141
142#[derive(Clone, Copy)]
143pub enum ContactEventKind {
144 Requested,
145 Accepted,
146 Cancelled,
147}
148
149impl EventEmitter<Event> for UserStore {}
150
151enum UpdateContacts {
152 Update(proto::UpdateContacts),
153 Wait(postage::barrier::Sender),
154 Clear(postage::barrier::Sender),
155}
156
157impl UserStore {
158 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
159 let (mut current_user_tx, current_user_rx) = watch::channel();
160 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
161 let rpc_subscriptions = vec![
162 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
163 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
164 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
165 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
166 ];
167 Self {
168 users: Default::default(),
169 by_github_login: Default::default(),
170 current_user: current_user_rx,
171 current_plan: None,
172 subscription_period: None,
173 trial_started_at: None,
174 model_request_usage_amount: None,
175 model_request_usage_limit: None,
176 edit_predictions_usage_amount: None,
177 edit_predictions_usage_limit: None,
178 is_usage_based_billing_enabled: None,
179 account_too_young: None,
180 has_overdue_invoices: None,
181 accepted_tos_at: None,
182 contacts: Default::default(),
183 incoming_contact_requests: Default::default(),
184 participant_indices: Default::default(),
185 outgoing_contact_requests: Default::default(),
186 invite_info: None,
187 client: Arc::downgrade(&client),
188 update_contacts_tx,
189 _maintain_contacts: cx.spawn(async move |this, cx| {
190 let _subscriptions = rpc_subscriptions;
191 while let Some(message) = update_contacts_rx.next().await {
192 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
193 {
194 task.log_err().await;
195 } else {
196 break;
197 }
198 }
199 }),
200 _maintain_current_user: cx.spawn(async move |this, cx| {
201 let mut status = client.status();
202 let weak = Arc::downgrade(&client);
203 drop(client);
204 while let Some(status) = status.next().await {
205 // if the client is dropped, the app is shutting down.
206 let Some(client) = weak.upgrade() else {
207 return Ok(());
208 };
209 match status {
210 Status::Connected { .. } => {
211 if let Some(user_id) = client.user_id() {
212 let fetch_user = if let Ok(fetch_user) =
213 this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
214 {
215 fetch_user
216 } else {
217 break;
218 };
219 let fetch_private_user_info =
220 client.request(proto::GetPrivateUserInfo {}).log_err();
221 let (user, info) =
222 futures::join!(fetch_user, fetch_private_user_info);
223
224 cx.update(|cx| {
225 if let Some(info) = info {
226 let staff =
227 info.staff && !*feature_flags::ZED_DISABLE_STAFF;
228 cx.update_flags(staff, info.flags);
229 client.telemetry.set_authenticated_user_info(
230 Some(info.metrics_id.clone()),
231 staff,
232 );
233
234 this.update(cx, |this, cx| {
235 let accepted_tos_at = {
236 #[cfg(debug_assertions)]
237 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
238 {
239 None
240 } else {
241 info.accepted_tos_at
242 }
243
244 #[cfg(not(debug_assertions))]
245 info.accepted_tos_at
246 };
247
248 this.set_current_user_accepted_tos_at(accepted_tos_at);
249 cx.emit(Event::PrivateUserInfoUpdated);
250 })
251 } else {
252 anyhow::Ok(())
253 }
254 })??;
255
256 current_user_tx.send(user).await.ok();
257
258 this.update(cx, |_, cx| cx.notify())?;
259 }
260 }
261 Status::SignedOut => {
262 current_user_tx.send(None).await.ok();
263 this.update(cx, |this, cx| {
264 this.accepted_tos_at = None;
265 cx.emit(Event::PrivateUserInfoUpdated);
266 cx.notify();
267 this.clear_contacts()
268 })?
269 .await;
270 }
271 Status::ConnectionLost => {
272 this.update(cx, |this, cx| {
273 cx.notify();
274 this.clear_contacts()
275 })?
276 .await;
277 }
278 _ => {}
279 }
280 }
281 Ok(())
282 }),
283 pending_contact_requests: Default::default(),
284 weak_self: cx.weak_entity(),
285 }
286 }
287
288 #[cfg(feature = "test-support")]
289 pub fn clear_cache(&mut self) {
290 self.users.clear();
291 self.by_github_login.clear();
292 }
293
294 async fn handle_update_invite_info(
295 this: Entity<Self>,
296 message: TypedEnvelope<proto::UpdateInviteInfo>,
297 mut cx: AsyncApp,
298 ) -> Result<()> {
299 this.update(&mut cx, |this, cx| {
300 this.invite_info = Some(InviteInfo {
301 url: Arc::from(message.payload.url),
302 count: message.payload.count,
303 });
304 cx.notify();
305 })?;
306 Ok(())
307 }
308
309 async fn handle_show_contacts(
310 this: Entity<Self>,
311 _: TypedEnvelope<proto::ShowContacts>,
312 mut cx: AsyncApp,
313 ) -> Result<()> {
314 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
315 Ok(())
316 }
317
318 pub fn invite_info(&self) -> Option<&InviteInfo> {
319 self.invite_info.as_ref()
320 }
321
322 async fn handle_update_contacts(
323 this: Entity<Self>,
324 message: TypedEnvelope<proto::UpdateContacts>,
325 mut cx: AsyncApp,
326 ) -> Result<()> {
327 this.read_with(&mut cx, |this, _| {
328 this.update_contacts_tx
329 .unbounded_send(UpdateContacts::Update(message.payload))
330 .unwrap();
331 })?;
332 Ok(())
333 }
334
335 async fn handle_update_plan(
336 this: Entity<Self>,
337 message: TypedEnvelope<proto::UpdateUserPlan>,
338 mut cx: AsyncApp,
339 ) -> Result<()> {
340 this.update(&mut cx, |this, cx| {
341 this.current_plan = Some(message.payload.plan());
342 this.subscription_period = maybe!({
343 let period = message.payload.subscription_period?;
344 let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
345 let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
346
347 Some((started_at, ended_at))
348 });
349 this.trial_started_at = message
350 .payload
351 .trial_started_at
352 .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
353 this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
354 this.account_too_young = message.payload.account_too_young;
355 this.has_overdue_invoices = message.payload.has_overdue_invoices;
356
357 if let Some(usage) = message.payload.usage {
358 this.model_request_usage_amount = Some(usage.model_requests_usage_amount);
359 this.model_request_usage_limit = usage.model_requests_usage_limit;
360 this.edit_predictions_usage_amount = Some(usage.edit_predictions_usage_amount);
361 this.edit_predictions_usage_limit = usage.edit_predictions_usage_limit;
362 }
363
364 cx.notify();
365 })?;
366 Ok(())
367 }
368
369 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
370 match message {
371 UpdateContacts::Wait(barrier) => {
372 drop(barrier);
373 Task::ready(Ok(()))
374 }
375 UpdateContacts::Clear(barrier) => {
376 self.contacts.clear();
377 self.incoming_contact_requests.clear();
378 self.outgoing_contact_requests.clear();
379 drop(barrier);
380 Task::ready(Ok(()))
381 }
382 UpdateContacts::Update(message) => {
383 let mut user_ids = HashSet::default();
384 for contact in &message.contacts {
385 user_ids.insert(contact.user_id);
386 }
387 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
388 user_ids.extend(message.outgoing_requests.iter());
389
390 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
391 cx.spawn(async move |this, cx| {
392 load_users.await?;
393
394 // Users are fetched in parallel above and cached in call to get_users
395 // No need to parallelize here
396 let mut updated_contacts = Vec::new();
397 let this = this.upgrade().context("can't upgrade user store handle")?;
398 for contact in message.contacts {
399 updated_contacts
400 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
401 }
402
403 let mut incoming_requests = Vec::new();
404 for request in message.incoming_requests {
405 incoming_requests.push({
406 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
407 .await?
408 });
409 }
410
411 let mut outgoing_requests = Vec::new();
412 for requested_user_id in message.outgoing_requests {
413 outgoing_requests.push(
414 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
415 .await?,
416 );
417 }
418
419 let removed_contacts =
420 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
421 let removed_incoming_requests =
422 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
423 let removed_outgoing_requests =
424 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
425
426 this.update(cx, |this, cx| {
427 // Remove contacts
428 this.contacts
429 .retain(|contact| !removed_contacts.contains(&contact.user.id));
430 // Update existing contacts and insert new ones
431 for updated_contact in updated_contacts {
432 match this.contacts.binary_search_by_key(
433 &&updated_contact.user.github_login,
434 |contact| &contact.user.github_login,
435 ) {
436 Ok(ix) => this.contacts[ix] = updated_contact,
437 Err(ix) => this.contacts.insert(ix, updated_contact),
438 }
439 }
440
441 // Remove incoming contact requests
442 this.incoming_contact_requests.retain(|user| {
443 if removed_incoming_requests.contains(&user.id) {
444 cx.emit(Event::Contact {
445 user: user.clone(),
446 kind: ContactEventKind::Cancelled,
447 });
448 false
449 } else {
450 true
451 }
452 });
453 // Update existing incoming requests and insert new ones
454 for user in incoming_requests {
455 match this
456 .incoming_contact_requests
457 .binary_search_by_key(&&user.github_login, |contact| {
458 &contact.github_login
459 }) {
460 Ok(ix) => this.incoming_contact_requests[ix] = user,
461 Err(ix) => this.incoming_contact_requests.insert(ix, user),
462 }
463 }
464
465 // Remove outgoing contact requests
466 this.outgoing_contact_requests
467 .retain(|user| !removed_outgoing_requests.contains(&user.id));
468 // Update existing incoming requests and insert new ones
469 for request in outgoing_requests {
470 match this
471 .outgoing_contact_requests
472 .binary_search_by_key(&&request.github_login, |contact| {
473 &contact.github_login
474 }) {
475 Ok(ix) => this.outgoing_contact_requests[ix] = request,
476 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
477 }
478 }
479
480 cx.notify();
481 })?;
482
483 Ok(())
484 })
485 }
486 }
487 }
488
489 pub fn contacts(&self) -> &[Arc<Contact>] {
490 &self.contacts
491 }
492
493 pub fn has_contact(&self, user: &Arc<User>) -> bool {
494 self.contacts
495 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
496 .is_ok()
497 }
498
499 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
500 &self.incoming_contact_requests
501 }
502
503 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
504 &self.outgoing_contact_requests
505 }
506
507 pub fn is_contact_request_pending(&self, user: &User) -> bool {
508 self.pending_contact_requests.contains_key(&user.id)
509 }
510
511 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
512 if self
513 .contacts
514 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
515 .is_ok()
516 {
517 ContactRequestStatus::RequestAccepted
518 } else if self
519 .outgoing_contact_requests
520 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
521 .is_ok()
522 {
523 ContactRequestStatus::RequestSent
524 } else if self
525 .incoming_contact_requests
526 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
527 .is_ok()
528 {
529 ContactRequestStatus::RequestReceived
530 } else {
531 ContactRequestStatus::None
532 }
533 }
534
535 pub fn request_contact(
536 &mut self,
537 responder_id: u64,
538 cx: &mut Context<Self>,
539 ) -> Task<Result<()>> {
540 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
541 }
542
543 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
544 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
545 }
546
547 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
548 self.incoming_contact_requests
549 .iter()
550 .any(|user| user.id == user_id)
551 }
552
553 pub fn respond_to_contact_request(
554 &mut self,
555 requester_id: u64,
556 accept: bool,
557 cx: &mut Context<Self>,
558 ) -> Task<Result<()>> {
559 self.perform_contact_request(
560 requester_id,
561 proto::RespondToContactRequest {
562 requester_id,
563 response: if accept {
564 proto::ContactRequestResponse::Accept
565 } else {
566 proto::ContactRequestResponse::Decline
567 } as i32,
568 },
569 cx,
570 )
571 }
572
573 pub fn dismiss_contact_request(
574 &self,
575 requester_id: u64,
576 cx: &Context<Self>,
577 ) -> Task<Result<()>> {
578 let client = self.client.upgrade();
579 cx.spawn(async move |_, _| {
580 client
581 .context("can't upgrade client reference")?
582 .request(proto::RespondToContactRequest {
583 requester_id,
584 response: proto::ContactRequestResponse::Dismiss as i32,
585 })
586 .await?;
587 Ok(())
588 })
589 }
590
591 fn perform_contact_request<T: RequestMessage>(
592 &mut self,
593 user_id: u64,
594 request: T,
595 cx: &mut Context<Self>,
596 ) -> Task<Result<()>> {
597 let client = self.client.upgrade();
598 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
599 cx.notify();
600
601 cx.spawn(async move |this, cx| {
602 let response = client
603 .context("can't upgrade client reference")?
604 .request(request)
605 .await;
606 this.update(cx, |this, cx| {
607 if let Entry::Occupied(mut request_count) =
608 this.pending_contact_requests.entry(user_id)
609 {
610 *request_count.get_mut() -= 1;
611 if *request_count.get() == 0 {
612 request_count.remove();
613 }
614 }
615 cx.notify();
616 })?;
617 response?;
618 Ok(())
619 })
620 }
621
622 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
623 let (tx, mut rx) = postage::barrier::channel();
624 self.update_contacts_tx
625 .unbounded_send(UpdateContacts::Clear(tx))
626 .unwrap();
627 async move {
628 rx.next().await;
629 }
630 }
631
632 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
633 let (tx, mut rx) = postage::barrier::channel();
634 self.update_contacts_tx
635 .unbounded_send(UpdateContacts::Wait(tx))
636 .unwrap();
637 async move {
638 rx.next().await;
639 }
640 }
641
642 pub fn get_users(
643 &self,
644 user_ids: Vec<u64>,
645 cx: &Context<Self>,
646 ) -> Task<Result<Vec<Arc<User>>>> {
647 let mut user_ids_to_fetch = user_ids.clone();
648 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
649
650 cx.spawn(async move |this, cx| {
651 if !user_ids_to_fetch.is_empty() {
652 this.update(cx, |this, cx| {
653 this.load_users(
654 proto::GetUsers {
655 user_ids: user_ids_to_fetch,
656 },
657 cx,
658 )
659 })?
660 .await?;
661 }
662
663 this.read_with(cx, |this, _| {
664 user_ids
665 .iter()
666 .map(|user_id| {
667 this.users
668 .get(user_id)
669 .cloned()
670 .with_context(|| format!("user {user_id} not found"))
671 })
672 .collect()
673 })?
674 })
675 }
676
677 pub fn fuzzy_search_users(
678 &self,
679 query: String,
680 cx: &Context<Self>,
681 ) -> Task<Result<Vec<Arc<User>>>> {
682 self.load_users(proto::FuzzySearchUsers { query }, cx)
683 }
684
685 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
686 self.users.get(&user_id).cloned()
687 }
688
689 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
690 if let Some(user) = self.users.get(&user_id).cloned() {
691 return Some(user);
692 }
693
694 self.get_user(user_id, cx).detach_and_log_err(cx);
695 None
696 }
697
698 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
699 if let Some(user) = self.users.get(&user_id).cloned() {
700 return Task::ready(Ok(user));
701 }
702
703 let load_users = self.get_users(vec![user_id], cx);
704 cx.spawn(async move |this, cx| {
705 load_users.await?;
706 this.read_with(cx, |this, _| {
707 this.users
708 .get(&user_id)
709 .cloned()
710 .context("server responded with no users")
711 })?
712 })
713 }
714
715 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
716 self.by_github_login
717 .get(github_login)
718 .and_then(|id| self.users.get(id).cloned())
719 }
720
721 pub fn current_user(&self) -> Option<Arc<User>> {
722 self.current_user.borrow().clone()
723 }
724
725 pub fn current_plan(&self) -> Option<proto::Plan> {
726 self.current_plan
727 }
728
729 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
730 self.subscription_period
731 }
732
733 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
734 self.trial_started_at
735 }
736
737 pub fn usage_based_billing_enabled(&self) -> Option<bool> {
738 self.is_usage_based_billing_enabled
739 }
740
741 pub fn model_request_usage_amount(&self) -> Option<u32> {
742 self.model_request_usage_amount
743 }
744
745 pub fn model_request_usage_limit(&self) -> Option<proto::UsageLimit> {
746 self.model_request_usage_limit.clone()
747 }
748
749 pub fn edit_predictions_usage_amount(&self) -> Option<u32> {
750 self.edit_predictions_usage_amount
751 }
752
753 pub fn edit_predictions_usage_limit(&self) -> Option<proto::UsageLimit> {
754 self.edit_predictions_usage_limit.clone()
755 }
756
757 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
758 self.current_user.clone()
759 }
760
761 /// Returns whether the user's account is too new to use the service.
762 pub fn account_too_young(&self) -> bool {
763 self.account_too_young.unwrap_or(false)
764 }
765
766 /// Returns whether the current user has overdue invoices and usage should be blocked.
767 pub fn has_overdue_invoices(&self) -> bool {
768 self.has_overdue_invoices.unwrap_or(false)
769 }
770
771 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
772 self.accepted_tos_at
773 .map(|accepted_tos_at| accepted_tos_at.is_some())
774 }
775
776 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
777 if self.current_user().is_none() {
778 return Task::ready(Err(anyhow!("no current user")));
779 };
780
781 let client = self.client.clone();
782 cx.spawn(async move |this, cx| -> anyhow::Result<()> {
783 let client = client.upgrade().context("client not found")?;
784 let response = client
785 .request(proto::AcceptTermsOfService {})
786 .await
787 .context("error accepting tos")?;
788 this.update(cx, |this, cx| {
789 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
790 cx.emit(Event::PrivateUserInfoUpdated);
791 })?;
792 Ok(())
793 })
794 }
795
796 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
797 self.accepted_tos_at = Some(
798 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
799 );
800 }
801
802 fn load_users(
803 &self,
804 request: impl RequestMessage<Response = UsersResponse>,
805 cx: &Context<Self>,
806 ) -> Task<Result<Vec<Arc<User>>>> {
807 let client = self.client.clone();
808 cx.spawn(async move |this, cx| {
809 if let Some(rpc) = client.upgrade() {
810 let response = rpc.request(request).await.context("error loading users")?;
811 let users = response.users;
812
813 this.update(cx, |this, _| this.insert(users))
814 } else {
815 Ok(Vec::new())
816 }
817 })
818 }
819
820 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
821 let mut ret = Vec::with_capacity(users.len());
822 for user in users {
823 let user = User::new(user);
824 if let Some(old) = self.users.insert(user.id, user.clone()) {
825 if old.github_login != user.github_login {
826 self.by_github_login.remove(&old.github_login);
827 }
828 }
829 self.by_github_login
830 .insert(user.github_login.clone(), user.id);
831 ret.push(user)
832 }
833 ret
834 }
835
836 pub fn set_participant_indices(
837 &mut self,
838 participant_indices: HashMap<u64, ParticipantIndex>,
839 cx: &mut Context<Self>,
840 ) {
841 if participant_indices != self.participant_indices {
842 self.participant_indices = participant_indices;
843 cx.emit(Event::ParticipantIndicesChanged);
844 }
845 }
846
847 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
848 &self.participant_indices
849 }
850
851 pub fn participant_names(
852 &self,
853 user_ids: impl Iterator<Item = u64>,
854 cx: &App,
855 ) -> HashMap<u64, SharedString> {
856 let mut ret = HashMap::default();
857 let mut missing_user_ids = Vec::new();
858 for id in user_ids {
859 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
860 ret.insert(id, github_login.into());
861 } else {
862 missing_user_ids.push(id)
863 }
864 }
865 if !missing_user_ids.is_empty() {
866 let this = self.weak_self.clone();
867 cx.spawn(async move |cx| {
868 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
869 .await
870 })
871 .detach_and_log_err(cx);
872 }
873 ret
874 }
875}
876
877impl User {
878 fn new(message: proto::User) -> Arc<Self> {
879 Arc::new(User {
880 id: message.id,
881 github_login: message.github_login,
882 avatar_uri: message.avatar_url.into(),
883 name: message.name,
884 email: message.email,
885 })
886 }
887}
888
889impl Contact {
890 async fn from_proto(
891 contact: proto::Contact,
892 user_store: &Entity<UserStore>,
893 cx: &mut AsyncApp,
894 ) -> Result<Self> {
895 let user = user_store
896 .update(cx, |user_store, cx| {
897 user_store.get_user(contact.user_id, cx)
898 })?
899 .await?;
900 Ok(Self {
901 user,
902 online: contact.online,
903 busy: contact.busy,
904 })
905 }
906}
907
908impl Collaborator {
909 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
910 Ok(Self {
911 peer_id: message.peer_id.context("invalid peer id")?,
912 replica_id: message.replica_id as ReplicaId,
913 user_id: message.user_id as UserId,
914 is_host: message.is_host,
915 })
916 }
917}