1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use cloud_llm_client::{
5 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
6 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
7};
8use collections::{HashMap, HashSet, hash_map::Entry};
9use derive_more::Deref;
10use feature_flags::FeatureFlagAppExt;
11use futures::{Future, StreamExt, channel::mpsc};
12use gpui::{
13 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
14};
15use http_client::http::{HeaderMap, HeaderValue};
16use postage::{sink::Sink, watch};
17use rpc::proto::{RequestMessage, UsersResponse};
18use std::{
19 str::FromStr as _,
20 sync::{Arc, Weak},
21};
22use text::ReplicaId;
23use util::{TryFutureExt as _, maybe};
24
25pub type UserId = u64;
26
27#[derive(
28 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
29)]
30pub struct ChannelId(pub u64);
31
32impl std::fmt::Display for ChannelId {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 self.0.fmt(f)
35 }
36}
37
38#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
39pub struct ProjectId(pub u64);
40
41impl ProjectId {
42 pub fn to_proto(&self) -> u64 {
43 self.0
44 }
45}
46
47#[derive(
48 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
49)]
50pub struct DevServerProjectId(pub u64);
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub struct ParticipantIndex(pub u32);
54
55#[derive(Default, Debug)]
56pub struct User {
57 pub id: UserId,
58 pub github_login: SharedString,
59 pub avatar_uri: SharedUri,
60 pub name: Option<String>,
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
64pub struct Collaborator {
65 pub peer_id: proto::PeerId,
66 pub replica_id: ReplicaId,
67 pub user_id: UserId,
68 pub is_host: bool,
69 pub committer_name: Option<String>,
70 pub committer_email: Option<String>,
71}
72
73impl PartialOrd for User {
74 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
75 Some(self.cmp(other))
76 }
77}
78
79impl Ord for User {
80 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
81 self.github_login.cmp(&other.github_login)
82 }
83}
84
85impl PartialEq for User {
86 fn eq(&self, other: &Self) -> bool {
87 self.id == other.id && self.github_login == other.github_login
88 }
89}
90
91impl Eq for User {}
92
93#[derive(Debug, PartialEq)]
94pub struct Contact {
95 pub user: Arc<User>,
96 pub online: bool,
97 pub busy: bool,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ContactRequestStatus {
102 None,
103 RequestSent,
104 RequestReceived,
105 RequestAccepted,
106}
107
108pub struct UserStore {
109 users: HashMap<u64, Arc<User>>,
110 by_github_login: HashMap<SharedString, u64>,
111 participant_indices: HashMap<u64, ParticipantIndex>,
112 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
113 current_plan: Option<proto::Plan>,
114 subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
115 trial_started_at: Option<DateTime<Utc>>,
116 is_usage_based_billing_enabled: Option<bool>,
117 account_too_young: Option<bool>,
118 has_overdue_invoices: Option<bool>,
119 current_user: watch::Receiver<Option<Arc<User>>>,
120 accepted_tos_at: Option<Option<DateTime<Utc>>>,
121 contacts: Vec<Arc<Contact>>,
122 incoming_contact_requests: Vec<Arc<User>>,
123 outgoing_contact_requests: Vec<Arc<User>>,
124 pending_contact_requests: HashMap<u64, usize>,
125 invite_info: Option<InviteInfo>,
126 client: Weak<Client>,
127 _maintain_contacts: Task<()>,
128 _maintain_current_user: Task<Result<()>>,
129 weak_self: WeakEntity<Self>,
130}
131
132#[derive(Clone)]
133pub struct InviteInfo {
134 pub count: u32,
135 pub url: Arc<str>,
136}
137
138pub enum Event {
139 Contact {
140 user: Arc<User>,
141 kind: ContactEventKind,
142 },
143 ShowContacts,
144 ParticipantIndicesChanged,
145 PrivateUserInfoUpdated,
146 PlanUpdated,
147}
148
149#[derive(Clone, Copy)]
150pub enum ContactEventKind {
151 Requested,
152 Accepted,
153 Cancelled,
154}
155
156impl EventEmitter<Event> for UserStore {}
157
158enum UpdateContacts {
159 Update(proto::UpdateContacts),
160 Wait(postage::barrier::Sender),
161 Clear(postage::barrier::Sender),
162}
163
164#[derive(Debug, Clone, Copy, Deref)]
165pub struct ModelRequestUsage(pub RequestUsage);
166
167#[derive(Debug, Clone, Copy, Deref)]
168pub struct EditPredictionUsage(pub RequestUsage);
169
170#[derive(Debug, Clone, Copy)]
171pub struct RequestUsage {
172 pub limit: UsageLimit,
173 pub amount: i32,
174}
175
176impl UserStore {
177 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
178 let (mut current_user_tx, current_user_rx) = watch::channel();
179 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
180 let rpc_subscriptions = vec![
181 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
182 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
183 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
184 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
185 ];
186 Self {
187 users: Default::default(),
188 by_github_login: Default::default(),
189 current_user: current_user_rx,
190 current_plan: None,
191 subscription_period: None,
192 trial_started_at: None,
193 is_usage_based_billing_enabled: None,
194 account_too_young: None,
195 has_overdue_invoices: None,
196 accepted_tos_at: None,
197 contacts: Default::default(),
198 incoming_contact_requests: Default::default(),
199 participant_indices: Default::default(),
200 outgoing_contact_requests: Default::default(),
201 invite_info: None,
202 client: Arc::downgrade(&client),
203 update_contacts_tx,
204 _maintain_contacts: cx.spawn(async move |this, cx| {
205 let _subscriptions = rpc_subscriptions;
206 while let Some(message) = update_contacts_rx.next().await {
207 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
208 {
209 task.log_err().await;
210 } else {
211 break;
212 }
213 }
214 }),
215 _maintain_current_user: cx.spawn(async move |this, cx| {
216 let mut status = client.status();
217 let weak = Arc::downgrade(&client);
218 drop(client);
219 while let Some(status) = status.next().await {
220 // if the client is dropped, the app is shutting down.
221 let Some(client) = weak.upgrade() else {
222 return Ok(());
223 };
224 match status {
225 Status::Connected { .. } => {
226 if let Some(user_id) = client.user_id() {
227 let fetch_user = if let Ok(fetch_user) =
228 this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
229 {
230 fetch_user
231 } else {
232 break;
233 };
234 let fetch_private_user_info =
235 client.request(proto::GetPrivateUserInfo {}).log_err();
236 let (user, info) =
237 futures::join!(fetch_user, fetch_private_user_info);
238
239 cx.update(|cx| {
240 if let Some(info) = info {
241 let staff =
242 info.staff && !*feature_flags::ZED_DISABLE_STAFF;
243 cx.update_flags(staff, info.flags);
244 client.telemetry.set_authenticated_user_info(
245 Some(info.metrics_id.clone()),
246 staff,
247 );
248
249 this.update(cx, |this, cx| {
250 let accepted_tos_at = {
251 #[cfg(debug_assertions)]
252 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
253 {
254 None
255 } else {
256 info.accepted_tos_at
257 }
258
259 #[cfg(not(debug_assertions))]
260 info.accepted_tos_at
261 };
262
263 this.set_current_user_accepted_tos_at(accepted_tos_at);
264 cx.emit(Event::PrivateUserInfoUpdated);
265 })
266 } else {
267 anyhow::Ok(())
268 }
269 })??;
270
271 current_user_tx.send(user).await.ok();
272
273 this.update(cx, |_, cx| cx.notify())?;
274 }
275 }
276 Status::SignedOut => {
277 current_user_tx.send(None).await.ok();
278 this.update(cx, |this, cx| {
279 this.accepted_tos_at = None;
280 cx.emit(Event::PrivateUserInfoUpdated);
281 cx.notify();
282 this.clear_contacts()
283 })?
284 .await;
285 }
286 Status::ConnectionLost => {
287 this.update(cx, |this, cx| {
288 cx.notify();
289 this.clear_contacts()
290 })?
291 .await;
292 }
293 _ => {}
294 }
295 }
296 Ok(())
297 }),
298 pending_contact_requests: Default::default(),
299 weak_self: cx.weak_entity(),
300 }
301 }
302
303 #[cfg(feature = "test-support")]
304 pub fn clear_cache(&mut self) {
305 self.users.clear();
306 self.by_github_login.clear();
307 }
308
309 async fn handle_update_invite_info(
310 this: Entity<Self>,
311 message: TypedEnvelope<proto::UpdateInviteInfo>,
312 mut cx: AsyncApp,
313 ) -> Result<()> {
314 this.update(&mut cx, |this, cx| {
315 this.invite_info = Some(InviteInfo {
316 url: Arc::from(message.payload.url),
317 count: message.payload.count,
318 });
319 cx.notify();
320 })?;
321 Ok(())
322 }
323
324 async fn handle_show_contacts(
325 this: Entity<Self>,
326 _: TypedEnvelope<proto::ShowContacts>,
327 mut cx: AsyncApp,
328 ) -> Result<()> {
329 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
330 Ok(())
331 }
332
333 pub fn invite_info(&self) -> Option<&InviteInfo> {
334 self.invite_info.as_ref()
335 }
336
337 async fn handle_update_contacts(
338 this: Entity<Self>,
339 message: TypedEnvelope<proto::UpdateContacts>,
340 mut cx: AsyncApp,
341 ) -> Result<()> {
342 this.read_with(&mut cx, |this, _| {
343 this.update_contacts_tx
344 .unbounded_send(UpdateContacts::Update(message.payload))
345 .unwrap();
346 })?;
347 Ok(())
348 }
349
350 async fn handle_update_plan(
351 this: Entity<Self>,
352 message: TypedEnvelope<proto::UpdateUserPlan>,
353 mut cx: AsyncApp,
354 ) -> Result<()> {
355 this.update(&mut cx, |this, cx| {
356 this.current_plan = Some(message.payload.plan());
357 this.subscription_period = maybe!({
358 let period = message.payload.subscription_period?;
359 let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
360 let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
361
362 Some((started_at, ended_at))
363 });
364 this.trial_started_at = message
365 .payload
366 .trial_started_at
367 .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
368 this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
369 this.account_too_young = message.payload.account_too_young;
370 this.has_overdue_invoices = message.payload.has_overdue_invoices;
371
372 cx.emit(Event::PlanUpdated);
373 cx.notify();
374 })?;
375 Ok(())
376 }
377
378 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
379 match message {
380 UpdateContacts::Wait(barrier) => {
381 drop(barrier);
382 Task::ready(Ok(()))
383 }
384 UpdateContacts::Clear(barrier) => {
385 self.contacts.clear();
386 self.incoming_contact_requests.clear();
387 self.outgoing_contact_requests.clear();
388 drop(barrier);
389 Task::ready(Ok(()))
390 }
391 UpdateContacts::Update(message) => {
392 let mut user_ids = HashSet::default();
393 for contact in &message.contacts {
394 user_ids.insert(contact.user_id);
395 }
396 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
397 user_ids.extend(message.outgoing_requests.iter());
398
399 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
400 cx.spawn(async move |this, cx| {
401 load_users.await?;
402
403 // Users are fetched in parallel above and cached in call to get_users
404 // No need to parallelize here
405 let mut updated_contacts = Vec::new();
406 let this = this.upgrade().context("can't upgrade user store handle")?;
407 for contact in message.contacts {
408 updated_contacts
409 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
410 }
411
412 let mut incoming_requests = Vec::new();
413 for request in message.incoming_requests {
414 incoming_requests.push({
415 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
416 .await?
417 });
418 }
419
420 let mut outgoing_requests = Vec::new();
421 for requested_user_id in message.outgoing_requests {
422 outgoing_requests.push(
423 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
424 .await?,
425 );
426 }
427
428 let removed_contacts =
429 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
430 let removed_incoming_requests =
431 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
432 let removed_outgoing_requests =
433 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
434
435 this.update(cx, |this, cx| {
436 // Remove contacts
437 this.contacts
438 .retain(|contact| !removed_contacts.contains(&contact.user.id));
439 // Update existing contacts and insert new ones
440 for updated_contact in updated_contacts {
441 match this.contacts.binary_search_by_key(
442 &&updated_contact.user.github_login,
443 |contact| &contact.user.github_login,
444 ) {
445 Ok(ix) => this.contacts[ix] = updated_contact,
446 Err(ix) => this.contacts.insert(ix, updated_contact),
447 }
448 }
449
450 // Remove incoming contact requests
451 this.incoming_contact_requests.retain(|user| {
452 if removed_incoming_requests.contains(&user.id) {
453 cx.emit(Event::Contact {
454 user: user.clone(),
455 kind: ContactEventKind::Cancelled,
456 });
457 false
458 } else {
459 true
460 }
461 });
462 // Update existing incoming requests and insert new ones
463 for user in incoming_requests {
464 match this
465 .incoming_contact_requests
466 .binary_search_by_key(&&user.github_login, |contact| {
467 &contact.github_login
468 }) {
469 Ok(ix) => this.incoming_contact_requests[ix] = user,
470 Err(ix) => this.incoming_contact_requests.insert(ix, user),
471 }
472 }
473
474 // Remove outgoing contact requests
475 this.outgoing_contact_requests
476 .retain(|user| !removed_outgoing_requests.contains(&user.id));
477 // Update existing incoming requests and insert new ones
478 for request in outgoing_requests {
479 match this
480 .outgoing_contact_requests
481 .binary_search_by_key(&&request.github_login, |contact| {
482 &contact.github_login
483 }) {
484 Ok(ix) => this.outgoing_contact_requests[ix] = request,
485 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
486 }
487 }
488
489 cx.notify();
490 })?;
491
492 Ok(())
493 })
494 }
495 }
496 }
497
498 pub fn contacts(&self) -> &[Arc<Contact>] {
499 &self.contacts
500 }
501
502 pub fn has_contact(&self, user: &Arc<User>) -> bool {
503 self.contacts
504 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
505 .is_ok()
506 }
507
508 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
509 &self.incoming_contact_requests
510 }
511
512 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
513 &self.outgoing_contact_requests
514 }
515
516 pub fn is_contact_request_pending(&self, user: &User) -> bool {
517 self.pending_contact_requests.contains_key(&user.id)
518 }
519
520 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
521 if self
522 .contacts
523 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
524 .is_ok()
525 {
526 ContactRequestStatus::RequestAccepted
527 } else if self
528 .outgoing_contact_requests
529 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
530 .is_ok()
531 {
532 ContactRequestStatus::RequestSent
533 } else if self
534 .incoming_contact_requests
535 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
536 .is_ok()
537 {
538 ContactRequestStatus::RequestReceived
539 } else {
540 ContactRequestStatus::None
541 }
542 }
543
544 pub fn request_contact(
545 &mut self,
546 responder_id: u64,
547 cx: &mut Context<Self>,
548 ) -> Task<Result<()>> {
549 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
550 }
551
552 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
553 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
554 }
555
556 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
557 self.incoming_contact_requests
558 .iter()
559 .any(|user| user.id == user_id)
560 }
561
562 pub fn respond_to_contact_request(
563 &mut self,
564 requester_id: u64,
565 accept: bool,
566 cx: &mut Context<Self>,
567 ) -> Task<Result<()>> {
568 self.perform_contact_request(
569 requester_id,
570 proto::RespondToContactRequest {
571 requester_id,
572 response: if accept {
573 proto::ContactRequestResponse::Accept
574 } else {
575 proto::ContactRequestResponse::Decline
576 } as i32,
577 },
578 cx,
579 )
580 }
581
582 pub fn dismiss_contact_request(
583 &self,
584 requester_id: u64,
585 cx: &Context<Self>,
586 ) -> Task<Result<()>> {
587 let client = self.client.upgrade();
588 cx.spawn(async move |_, _| {
589 client
590 .context("can't upgrade client reference")?
591 .request(proto::RespondToContactRequest {
592 requester_id,
593 response: proto::ContactRequestResponse::Dismiss as i32,
594 })
595 .await?;
596 Ok(())
597 })
598 }
599
600 fn perform_contact_request<T: RequestMessage>(
601 &mut self,
602 user_id: u64,
603 request: T,
604 cx: &mut Context<Self>,
605 ) -> Task<Result<()>> {
606 let client = self.client.upgrade();
607 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
608 cx.notify();
609
610 cx.spawn(async move |this, cx| {
611 let response = client
612 .context("can't upgrade client reference")?
613 .request(request)
614 .await;
615 this.update(cx, |this, cx| {
616 if let Entry::Occupied(mut request_count) =
617 this.pending_contact_requests.entry(user_id)
618 {
619 *request_count.get_mut() -= 1;
620 if *request_count.get() == 0 {
621 request_count.remove();
622 }
623 }
624 cx.notify();
625 })?;
626 response?;
627 Ok(())
628 })
629 }
630
631 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
632 let (tx, mut rx) = postage::barrier::channel();
633 self.update_contacts_tx
634 .unbounded_send(UpdateContacts::Clear(tx))
635 .unwrap();
636 async move {
637 rx.next().await;
638 }
639 }
640
641 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
642 let (tx, mut rx) = postage::barrier::channel();
643 self.update_contacts_tx
644 .unbounded_send(UpdateContacts::Wait(tx))
645 .unwrap();
646 async move {
647 rx.next().await;
648 }
649 }
650
651 pub fn get_users(
652 &self,
653 user_ids: Vec<u64>,
654 cx: &Context<Self>,
655 ) -> Task<Result<Vec<Arc<User>>>> {
656 let mut user_ids_to_fetch = user_ids.clone();
657 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
658
659 cx.spawn(async move |this, cx| {
660 if !user_ids_to_fetch.is_empty() {
661 this.update(cx, |this, cx| {
662 this.load_users(
663 proto::GetUsers {
664 user_ids: user_ids_to_fetch,
665 },
666 cx,
667 )
668 })?
669 .await?;
670 }
671
672 this.read_with(cx, |this, _| {
673 user_ids
674 .iter()
675 .map(|user_id| {
676 this.users
677 .get(user_id)
678 .cloned()
679 .with_context(|| format!("user {user_id} not found"))
680 })
681 .collect()
682 })?
683 })
684 }
685
686 pub fn fuzzy_search_users(
687 &self,
688 query: String,
689 cx: &Context<Self>,
690 ) -> Task<Result<Vec<Arc<User>>>> {
691 self.load_users(proto::FuzzySearchUsers { query }, cx)
692 }
693
694 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
695 self.users.get(&user_id).cloned()
696 }
697
698 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
699 if let Some(user) = self.users.get(&user_id).cloned() {
700 return Some(user);
701 }
702
703 self.get_user(user_id, cx).detach_and_log_err(cx);
704 None
705 }
706
707 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
708 if let Some(user) = self.users.get(&user_id).cloned() {
709 return Task::ready(Ok(user));
710 }
711
712 let load_users = self.get_users(vec![user_id], cx);
713 cx.spawn(async move |this, cx| {
714 load_users.await?;
715 this.read_with(cx, |this, _| {
716 this.users
717 .get(&user_id)
718 .cloned()
719 .context("server responded with no users")
720 })?
721 })
722 }
723
724 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
725 self.by_github_login
726 .get(github_login)
727 .and_then(|id| self.users.get(id).cloned())
728 }
729
730 pub fn current_user(&self) -> Option<Arc<User>> {
731 self.current_user.borrow().clone()
732 }
733
734 pub fn current_plan(&self) -> Option<proto::Plan> {
735 #[cfg(debug_assertions)]
736 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
737 return match plan.as_str() {
738 "free" => Some(proto::Plan::Free),
739 "trial" => Some(proto::Plan::ZedProTrial),
740 "pro" => Some(proto::Plan::ZedPro),
741 _ => {
742 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
743 }
744 };
745 }
746
747 self.current_plan
748 }
749
750 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
751 self.subscription_period
752 }
753
754 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
755 self.trial_started_at
756 }
757
758 pub fn usage_based_billing_enabled(&self) -> Option<bool> {
759 self.is_usage_based_billing_enabled
760 }
761
762 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
763 self.current_user.clone()
764 }
765
766 /// Returns whether the user's account is too new to use the service.
767 pub fn account_too_young(&self) -> bool {
768 self.account_too_young.unwrap_or(false)
769 }
770
771 /// Returns whether the current user has overdue invoices and usage should be blocked.
772 pub fn has_overdue_invoices(&self) -> bool {
773 self.has_overdue_invoices.unwrap_or(false)
774 }
775
776 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
777 self.accepted_tos_at
778 .map(|accepted_tos_at| accepted_tos_at.is_some())
779 }
780
781 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
782 if self.current_user().is_none() {
783 return Task::ready(Err(anyhow!("no current user")));
784 };
785
786 let client = self.client.clone();
787 cx.spawn(async move |this, cx| -> anyhow::Result<()> {
788 let client = client.upgrade().context("client not found")?;
789 let response = client
790 .request(proto::AcceptTermsOfService {})
791 .await
792 .context("error accepting tos")?;
793 this.update(cx, |this, cx| {
794 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
795 cx.emit(Event::PrivateUserInfoUpdated);
796 })?;
797 Ok(())
798 })
799 }
800
801 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
802 self.accepted_tos_at = Some(
803 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
804 );
805 }
806
807 fn load_users(
808 &self,
809 request: impl RequestMessage<Response = UsersResponse>,
810 cx: &Context<Self>,
811 ) -> Task<Result<Vec<Arc<User>>>> {
812 let client = self.client.clone();
813 cx.spawn(async move |this, cx| {
814 if let Some(rpc) = client.upgrade() {
815 let response = rpc.request(request).await.context("error loading users")?;
816 let users = response.users;
817
818 this.update(cx, |this, _| this.insert(users))
819 } else {
820 Ok(Vec::new())
821 }
822 })
823 }
824
825 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
826 let mut ret = Vec::with_capacity(users.len());
827 for user in users {
828 let user = User::new(user);
829 if let Some(old) = self.users.insert(user.id, user.clone()) {
830 if old.github_login != user.github_login {
831 self.by_github_login.remove(&old.github_login);
832 }
833 }
834 self.by_github_login
835 .insert(user.github_login.clone(), user.id);
836 ret.push(user)
837 }
838 ret
839 }
840
841 pub fn set_participant_indices(
842 &mut self,
843 participant_indices: HashMap<u64, ParticipantIndex>,
844 cx: &mut Context<Self>,
845 ) {
846 if participant_indices != self.participant_indices {
847 self.participant_indices = participant_indices;
848 cx.emit(Event::ParticipantIndicesChanged);
849 }
850 }
851
852 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
853 &self.participant_indices
854 }
855
856 pub fn participant_names(
857 &self,
858 user_ids: impl Iterator<Item = u64>,
859 cx: &App,
860 ) -> HashMap<u64, SharedString> {
861 let mut ret = HashMap::default();
862 let mut missing_user_ids = Vec::new();
863 for id in user_ids {
864 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
865 ret.insert(id, github_login);
866 } else {
867 missing_user_ids.push(id)
868 }
869 }
870 if !missing_user_ids.is_empty() {
871 let this = self.weak_self.clone();
872 cx.spawn(async move |cx| {
873 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
874 .await
875 })
876 .detach_and_log_err(cx);
877 }
878 ret
879 }
880}
881
882impl User {
883 fn new(message: proto::User) -> Arc<Self> {
884 Arc::new(User {
885 id: message.id,
886 github_login: message.github_login.into(),
887 avatar_uri: message.avatar_url.into(),
888 name: message.name,
889 })
890 }
891}
892
893impl Contact {
894 async fn from_proto(
895 contact: proto::Contact,
896 user_store: &Entity<UserStore>,
897 cx: &mut AsyncApp,
898 ) -> Result<Self> {
899 let user = user_store
900 .update(cx, |user_store, cx| {
901 user_store.get_user(contact.user_id, cx)
902 })?
903 .await?;
904 Ok(Self {
905 user,
906 online: contact.online,
907 busy: contact.busy,
908 })
909 }
910}
911
912impl Collaborator {
913 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
914 Ok(Self {
915 peer_id: message.peer_id.context("invalid peer id")?,
916 replica_id: message.replica_id as ReplicaId,
917 user_id: message.user_id as UserId,
918 is_host: message.is_host,
919 committer_name: message.committer_name,
920 committer_email: message.committer_email,
921 })
922 }
923}
924
925impl RequestUsage {
926 pub fn over_limit(&self) -> bool {
927 match self.limit {
928 UsageLimit::Limited(limit) => self.amount >= limit,
929 UsageLimit::Unlimited => false,
930 }
931 }
932
933 pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
934 let limit = match limit.variant? {
935 proto::usage_limit::Variant::Limited(limited) => {
936 UsageLimit::Limited(limited.limit as i32)
937 }
938 proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
939 };
940 Some(RequestUsage {
941 limit,
942 amount: amount as i32,
943 })
944 }
945
946 fn from_headers(
947 limit_name: &str,
948 amount_name: &str,
949 headers: &HeaderMap<HeaderValue>,
950 ) -> Result<Self> {
951 let limit = headers
952 .get(limit_name)
953 .with_context(|| format!("missing {limit_name:?} header"))?;
954 let limit = UsageLimit::from_str(limit.to_str()?)?;
955
956 let amount = headers
957 .get(amount_name)
958 .with_context(|| format!("missing {amount_name:?} header"))?;
959 let amount = amount.to_str()?.parse::<i32>()?;
960
961 Ok(Self { limit, amount })
962 }
963}
964
965impl ModelRequestUsage {
966 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
967 Ok(Self(RequestUsage::from_headers(
968 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
969 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
970 headers,
971 )?))
972 }
973}
974
975impl EditPredictionUsage {
976 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
977 Ok(Self(RequestUsage::from_headers(
978 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
979 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
980 headers,
981 )?))
982 }
983}