1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use cloud_llm_client::{
5 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
6 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
7};
8use collections::{HashMap, HashSet, hash_map::Entry};
9use derive_more::Deref;
10use feature_flags::FeatureFlagAppExt;
11use futures::{Future, StreamExt, channel::mpsc};
12use gpui::{
13 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
14};
15use http_client::http::{HeaderMap, HeaderValue};
16use postage::{sink::Sink, watch};
17use rpc::proto::{RequestMessage, UsersResponse};
18use std::{
19 str::FromStr as _,
20 sync::{Arc, Weak},
21};
22use text::ReplicaId;
23use util::{TryFutureExt as _, maybe};
24
25pub type UserId = u64;
26
27#[derive(
28 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
29)]
30pub struct ChannelId(pub u64);
31
32impl std::fmt::Display for ChannelId {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 self.0.fmt(f)
35 }
36}
37
38#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
39pub struct ProjectId(pub u64);
40
41impl ProjectId {
42 pub fn to_proto(&self) -> u64 {
43 self.0
44 }
45}
46
47#[derive(
48 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
49)]
50pub struct DevServerProjectId(pub u64);
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub struct ParticipantIndex(pub u32);
54
55#[derive(Default, Debug)]
56pub struct User {
57 pub id: UserId,
58 pub github_login: SharedString,
59 pub avatar_uri: SharedUri,
60 pub name: Option<String>,
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
64pub struct Collaborator {
65 pub peer_id: proto::PeerId,
66 pub replica_id: ReplicaId,
67 pub user_id: UserId,
68 pub is_host: bool,
69 pub committer_name: Option<String>,
70 pub committer_email: Option<String>,
71}
72
73impl PartialOrd for User {
74 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
75 Some(self.cmp(other))
76 }
77}
78
79impl Ord for User {
80 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
81 self.github_login.cmp(&other.github_login)
82 }
83}
84
85impl PartialEq for User {
86 fn eq(&self, other: &Self) -> bool {
87 self.id == other.id && self.github_login == other.github_login
88 }
89}
90
91impl Eq for User {}
92
93#[derive(Debug, PartialEq)]
94pub struct Contact {
95 pub user: Arc<User>,
96 pub online: bool,
97 pub busy: bool,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ContactRequestStatus {
102 None,
103 RequestSent,
104 RequestReceived,
105 RequestAccepted,
106}
107
108pub struct UserStore {
109 users: HashMap<u64, Arc<User>>,
110 by_github_login: HashMap<SharedString, u64>,
111 participant_indices: HashMap<u64, ParticipantIndex>,
112 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
113 current_plan: Option<proto::Plan>,
114 subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
115 trial_started_at: Option<DateTime<Utc>>,
116 model_request_usage: Option<ModelRequestUsage>,
117 is_usage_based_billing_enabled: Option<bool>,
118 account_too_young: Option<bool>,
119 has_overdue_invoices: Option<bool>,
120 current_user: watch::Receiver<Option<Arc<User>>>,
121 accepted_tos_at: Option<Option<DateTime<Utc>>>,
122 contacts: Vec<Arc<Contact>>,
123 incoming_contact_requests: Vec<Arc<User>>,
124 outgoing_contact_requests: Vec<Arc<User>>,
125 pending_contact_requests: HashMap<u64, usize>,
126 invite_info: Option<InviteInfo>,
127 client: Weak<Client>,
128 _maintain_contacts: Task<()>,
129 _maintain_current_user: Task<Result<()>>,
130 weak_self: WeakEntity<Self>,
131}
132
133#[derive(Clone)]
134pub struct InviteInfo {
135 pub count: u32,
136 pub url: Arc<str>,
137}
138
139pub enum Event {
140 Contact {
141 user: Arc<User>,
142 kind: ContactEventKind,
143 },
144 ShowContacts,
145 ParticipantIndicesChanged,
146 PrivateUserInfoUpdated,
147 PlanUpdated,
148}
149
150#[derive(Clone, Copy)]
151pub enum ContactEventKind {
152 Requested,
153 Accepted,
154 Cancelled,
155}
156
157impl EventEmitter<Event> for UserStore {}
158
159enum UpdateContacts {
160 Update(proto::UpdateContacts),
161 Wait(postage::barrier::Sender),
162 Clear(postage::barrier::Sender),
163}
164
165#[derive(Debug, Clone, Copy, Deref)]
166pub struct ModelRequestUsage(pub RequestUsage);
167
168#[derive(Debug, Clone, Copy, Deref)]
169pub struct EditPredictionUsage(pub RequestUsage);
170
171#[derive(Debug, Clone, Copy)]
172pub struct RequestUsage {
173 pub limit: UsageLimit,
174 pub amount: i32,
175}
176
177impl UserStore {
178 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
179 let (mut current_user_tx, current_user_rx) = watch::channel();
180 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
181 let rpc_subscriptions = vec![
182 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
183 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
184 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
185 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
186 ];
187 Self {
188 users: Default::default(),
189 by_github_login: Default::default(),
190 current_user: current_user_rx,
191 current_plan: None,
192 subscription_period: None,
193 trial_started_at: None,
194 model_request_usage: None,
195 is_usage_based_billing_enabled: None,
196 account_too_young: None,
197 has_overdue_invoices: None,
198 accepted_tos_at: None,
199 contacts: Default::default(),
200 incoming_contact_requests: Default::default(),
201 participant_indices: Default::default(),
202 outgoing_contact_requests: Default::default(),
203 invite_info: None,
204 client: Arc::downgrade(&client),
205 update_contacts_tx,
206 _maintain_contacts: cx.spawn(async move |this, cx| {
207 let _subscriptions = rpc_subscriptions;
208 while let Some(message) = update_contacts_rx.next().await {
209 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
210 {
211 task.log_err().await;
212 } else {
213 break;
214 }
215 }
216 }),
217 _maintain_current_user: cx.spawn(async move |this, cx| {
218 let mut status = client.status();
219 let weak = Arc::downgrade(&client);
220 drop(client);
221 while let Some(status) = status.next().await {
222 // if the client is dropped, the app is shutting down.
223 let Some(client) = weak.upgrade() else {
224 return Ok(());
225 };
226 match status {
227 Status::Connected { .. } => {
228 if let Some(user_id) = client.user_id() {
229 let fetch_user = if let Ok(fetch_user) =
230 this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
231 {
232 fetch_user
233 } else {
234 break;
235 };
236 let fetch_private_user_info =
237 client.request(proto::GetPrivateUserInfo {}).log_err();
238 let (user, info) =
239 futures::join!(fetch_user, fetch_private_user_info);
240
241 cx.update(|cx| {
242 if let Some(info) = info {
243 let staff =
244 info.staff && !*feature_flags::ZED_DISABLE_STAFF;
245 cx.update_flags(staff, info.flags);
246 client.telemetry.set_authenticated_user_info(
247 Some(info.metrics_id.clone()),
248 staff,
249 );
250
251 this.update(cx, |this, cx| {
252 let accepted_tos_at = {
253 #[cfg(debug_assertions)]
254 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
255 {
256 None
257 } else {
258 info.accepted_tos_at
259 }
260
261 #[cfg(not(debug_assertions))]
262 info.accepted_tos_at
263 };
264
265 this.set_current_user_accepted_tos_at(accepted_tos_at);
266 cx.emit(Event::PrivateUserInfoUpdated);
267 })
268 } else {
269 anyhow::Ok(())
270 }
271 })??;
272
273 current_user_tx.send(user).await.ok();
274
275 this.update(cx, |_, cx| cx.notify())?;
276 }
277 }
278 Status::SignedOut => {
279 current_user_tx.send(None).await.ok();
280 this.update(cx, |this, cx| {
281 this.accepted_tos_at = None;
282 cx.emit(Event::PrivateUserInfoUpdated);
283 cx.notify();
284 this.clear_contacts()
285 })?
286 .await;
287 }
288 Status::ConnectionLost => {
289 this.update(cx, |this, cx| {
290 cx.notify();
291 this.clear_contacts()
292 })?
293 .await;
294 }
295 _ => {}
296 }
297 }
298 Ok(())
299 }),
300 pending_contact_requests: Default::default(),
301 weak_self: cx.weak_entity(),
302 }
303 }
304
305 #[cfg(feature = "test-support")]
306 pub fn clear_cache(&mut self) {
307 self.users.clear();
308 self.by_github_login.clear();
309 }
310
311 async fn handle_update_invite_info(
312 this: Entity<Self>,
313 message: TypedEnvelope<proto::UpdateInviteInfo>,
314 mut cx: AsyncApp,
315 ) -> Result<()> {
316 this.update(&mut cx, |this, cx| {
317 this.invite_info = Some(InviteInfo {
318 url: Arc::from(message.payload.url),
319 count: message.payload.count,
320 });
321 cx.notify();
322 })?;
323 Ok(())
324 }
325
326 async fn handle_show_contacts(
327 this: Entity<Self>,
328 _: TypedEnvelope<proto::ShowContacts>,
329 mut cx: AsyncApp,
330 ) -> Result<()> {
331 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
332 Ok(())
333 }
334
335 pub fn invite_info(&self) -> Option<&InviteInfo> {
336 self.invite_info.as_ref()
337 }
338
339 async fn handle_update_contacts(
340 this: Entity<Self>,
341 message: TypedEnvelope<proto::UpdateContacts>,
342 mut cx: AsyncApp,
343 ) -> Result<()> {
344 this.read_with(&mut cx, |this, _| {
345 this.update_contacts_tx
346 .unbounded_send(UpdateContacts::Update(message.payload))
347 .unwrap();
348 })?;
349 Ok(())
350 }
351
352 async fn handle_update_plan(
353 this: Entity<Self>,
354 message: TypedEnvelope<proto::UpdateUserPlan>,
355 mut cx: AsyncApp,
356 ) -> Result<()> {
357 this.update(&mut cx, |this, cx| {
358 this.current_plan = Some(message.payload.plan());
359 this.subscription_period = maybe!({
360 let period = message.payload.subscription_period?;
361 let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
362 let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
363
364 Some((started_at, ended_at))
365 });
366 this.trial_started_at = message
367 .payload
368 .trial_started_at
369 .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
370 this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
371 this.account_too_young = message.payload.account_too_young;
372 this.has_overdue_invoices = message.payload.has_overdue_invoices;
373
374 if let Some(usage) = message.payload.usage {
375 // limits are always present even though they are wrapped in Option
376 this.model_request_usage = usage
377 .model_requests_usage_limit
378 .and_then(|limit| {
379 RequestUsage::from_proto(usage.model_requests_usage_amount, limit)
380 })
381 .map(ModelRequestUsage);
382 }
383
384 cx.emit(Event::PlanUpdated);
385 cx.notify();
386 })?;
387 Ok(())
388 }
389
390 pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
391 self.model_request_usage = Some(usage);
392 cx.notify();
393 }
394
395 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
396 match message {
397 UpdateContacts::Wait(barrier) => {
398 drop(barrier);
399 Task::ready(Ok(()))
400 }
401 UpdateContacts::Clear(barrier) => {
402 self.contacts.clear();
403 self.incoming_contact_requests.clear();
404 self.outgoing_contact_requests.clear();
405 drop(barrier);
406 Task::ready(Ok(()))
407 }
408 UpdateContacts::Update(message) => {
409 let mut user_ids = HashSet::default();
410 for contact in &message.contacts {
411 user_ids.insert(contact.user_id);
412 }
413 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
414 user_ids.extend(message.outgoing_requests.iter());
415
416 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
417 cx.spawn(async move |this, cx| {
418 load_users.await?;
419
420 // Users are fetched in parallel above and cached in call to get_users
421 // No need to parallelize here
422 let mut updated_contacts = Vec::new();
423 let this = this.upgrade().context("can't upgrade user store handle")?;
424 for contact in message.contacts {
425 updated_contacts
426 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
427 }
428
429 let mut incoming_requests = Vec::new();
430 for request in message.incoming_requests {
431 incoming_requests.push({
432 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
433 .await?
434 });
435 }
436
437 let mut outgoing_requests = Vec::new();
438 for requested_user_id in message.outgoing_requests {
439 outgoing_requests.push(
440 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
441 .await?,
442 );
443 }
444
445 let removed_contacts =
446 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
447 let removed_incoming_requests =
448 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
449 let removed_outgoing_requests =
450 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
451
452 this.update(cx, |this, cx| {
453 // Remove contacts
454 this.contacts
455 .retain(|contact| !removed_contacts.contains(&contact.user.id));
456 // Update existing contacts and insert new ones
457 for updated_contact in updated_contacts {
458 match this.contacts.binary_search_by_key(
459 &&updated_contact.user.github_login,
460 |contact| &contact.user.github_login,
461 ) {
462 Ok(ix) => this.contacts[ix] = updated_contact,
463 Err(ix) => this.contacts.insert(ix, updated_contact),
464 }
465 }
466
467 // Remove incoming contact requests
468 this.incoming_contact_requests.retain(|user| {
469 if removed_incoming_requests.contains(&user.id) {
470 cx.emit(Event::Contact {
471 user: user.clone(),
472 kind: ContactEventKind::Cancelled,
473 });
474 false
475 } else {
476 true
477 }
478 });
479 // Update existing incoming requests and insert new ones
480 for user in incoming_requests {
481 match this
482 .incoming_contact_requests
483 .binary_search_by_key(&&user.github_login, |contact| {
484 &contact.github_login
485 }) {
486 Ok(ix) => this.incoming_contact_requests[ix] = user,
487 Err(ix) => this.incoming_contact_requests.insert(ix, user),
488 }
489 }
490
491 // Remove outgoing contact requests
492 this.outgoing_contact_requests
493 .retain(|user| !removed_outgoing_requests.contains(&user.id));
494 // Update existing incoming requests and insert new ones
495 for request in outgoing_requests {
496 match this
497 .outgoing_contact_requests
498 .binary_search_by_key(&&request.github_login, |contact| {
499 &contact.github_login
500 }) {
501 Ok(ix) => this.outgoing_contact_requests[ix] = request,
502 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
503 }
504 }
505
506 cx.notify();
507 })?;
508
509 Ok(())
510 })
511 }
512 }
513 }
514
515 pub fn contacts(&self) -> &[Arc<Contact>] {
516 &self.contacts
517 }
518
519 pub fn has_contact(&self, user: &Arc<User>) -> bool {
520 self.contacts
521 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
522 .is_ok()
523 }
524
525 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
526 &self.incoming_contact_requests
527 }
528
529 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
530 &self.outgoing_contact_requests
531 }
532
533 pub fn is_contact_request_pending(&self, user: &User) -> bool {
534 self.pending_contact_requests.contains_key(&user.id)
535 }
536
537 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
538 if self
539 .contacts
540 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
541 .is_ok()
542 {
543 ContactRequestStatus::RequestAccepted
544 } else if self
545 .outgoing_contact_requests
546 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
547 .is_ok()
548 {
549 ContactRequestStatus::RequestSent
550 } else if self
551 .incoming_contact_requests
552 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
553 .is_ok()
554 {
555 ContactRequestStatus::RequestReceived
556 } else {
557 ContactRequestStatus::None
558 }
559 }
560
561 pub fn request_contact(
562 &mut self,
563 responder_id: u64,
564 cx: &mut Context<Self>,
565 ) -> Task<Result<()>> {
566 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
567 }
568
569 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
570 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
571 }
572
573 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
574 self.incoming_contact_requests
575 .iter()
576 .any(|user| user.id == user_id)
577 }
578
579 pub fn respond_to_contact_request(
580 &mut self,
581 requester_id: u64,
582 accept: bool,
583 cx: &mut Context<Self>,
584 ) -> Task<Result<()>> {
585 self.perform_contact_request(
586 requester_id,
587 proto::RespondToContactRequest {
588 requester_id,
589 response: if accept {
590 proto::ContactRequestResponse::Accept
591 } else {
592 proto::ContactRequestResponse::Decline
593 } as i32,
594 },
595 cx,
596 )
597 }
598
599 pub fn dismiss_contact_request(
600 &self,
601 requester_id: u64,
602 cx: &Context<Self>,
603 ) -> Task<Result<()>> {
604 let client = self.client.upgrade();
605 cx.spawn(async move |_, _| {
606 client
607 .context("can't upgrade client reference")?
608 .request(proto::RespondToContactRequest {
609 requester_id,
610 response: proto::ContactRequestResponse::Dismiss as i32,
611 })
612 .await?;
613 Ok(())
614 })
615 }
616
617 fn perform_contact_request<T: RequestMessage>(
618 &mut self,
619 user_id: u64,
620 request: T,
621 cx: &mut Context<Self>,
622 ) -> Task<Result<()>> {
623 let client = self.client.upgrade();
624 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
625 cx.notify();
626
627 cx.spawn(async move |this, cx| {
628 let response = client
629 .context("can't upgrade client reference")?
630 .request(request)
631 .await;
632 this.update(cx, |this, cx| {
633 if let Entry::Occupied(mut request_count) =
634 this.pending_contact_requests.entry(user_id)
635 {
636 *request_count.get_mut() -= 1;
637 if *request_count.get() == 0 {
638 request_count.remove();
639 }
640 }
641 cx.notify();
642 })?;
643 response?;
644 Ok(())
645 })
646 }
647
648 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
649 let (tx, mut rx) = postage::barrier::channel();
650 self.update_contacts_tx
651 .unbounded_send(UpdateContacts::Clear(tx))
652 .unwrap();
653 async move {
654 rx.next().await;
655 }
656 }
657
658 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
659 let (tx, mut rx) = postage::barrier::channel();
660 self.update_contacts_tx
661 .unbounded_send(UpdateContacts::Wait(tx))
662 .unwrap();
663 async move {
664 rx.next().await;
665 }
666 }
667
668 pub fn get_users(
669 &self,
670 user_ids: Vec<u64>,
671 cx: &Context<Self>,
672 ) -> Task<Result<Vec<Arc<User>>>> {
673 let mut user_ids_to_fetch = user_ids.clone();
674 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
675
676 cx.spawn(async move |this, cx| {
677 if !user_ids_to_fetch.is_empty() {
678 this.update(cx, |this, cx| {
679 this.load_users(
680 proto::GetUsers {
681 user_ids: user_ids_to_fetch,
682 },
683 cx,
684 )
685 })?
686 .await?;
687 }
688
689 this.read_with(cx, |this, _| {
690 user_ids
691 .iter()
692 .map(|user_id| {
693 this.users
694 .get(user_id)
695 .cloned()
696 .with_context(|| format!("user {user_id} not found"))
697 })
698 .collect()
699 })?
700 })
701 }
702
703 pub fn fuzzy_search_users(
704 &self,
705 query: String,
706 cx: &Context<Self>,
707 ) -> Task<Result<Vec<Arc<User>>>> {
708 self.load_users(proto::FuzzySearchUsers { query }, cx)
709 }
710
711 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
712 self.users.get(&user_id).cloned()
713 }
714
715 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
716 if let Some(user) = self.users.get(&user_id).cloned() {
717 return Some(user);
718 }
719
720 self.get_user(user_id, cx).detach_and_log_err(cx);
721 None
722 }
723
724 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
725 if let Some(user) = self.users.get(&user_id).cloned() {
726 return Task::ready(Ok(user));
727 }
728
729 let load_users = self.get_users(vec![user_id], cx);
730 cx.spawn(async move |this, cx| {
731 load_users.await?;
732 this.read_with(cx, |this, _| {
733 this.users
734 .get(&user_id)
735 .cloned()
736 .context("server responded with no users")
737 })?
738 })
739 }
740
741 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
742 self.by_github_login
743 .get(github_login)
744 .and_then(|id| self.users.get(id).cloned())
745 }
746
747 pub fn current_user(&self) -> Option<Arc<User>> {
748 self.current_user.borrow().clone()
749 }
750
751 pub fn current_plan(&self) -> Option<proto::Plan> {
752 #[cfg(debug_assertions)]
753 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
754 return match plan.as_str() {
755 "free" => Some(proto::Plan::Free),
756 "trial" => Some(proto::Plan::ZedProTrial),
757 "pro" => Some(proto::Plan::ZedPro),
758 _ => {
759 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
760 }
761 };
762 }
763
764 self.current_plan
765 }
766
767 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
768 self.subscription_period
769 }
770
771 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
772 self.trial_started_at
773 }
774
775 pub fn usage_based_billing_enabled(&self) -> Option<bool> {
776 self.is_usage_based_billing_enabled
777 }
778
779 pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
780 self.model_request_usage
781 }
782
783 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
784 self.current_user.clone()
785 }
786
787 /// Returns whether the user's account is too new to use the service.
788 pub fn account_too_young(&self) -> bool {
789 self.account_too_young.unwrap_or(false)
790 }
791
792 /// Returns whether the current user has overdue invoices and usage should be blocked.
793 pub fn has_overdue_invoices(&self) -> bool {
794 self.has_overdue_invoices.unwrap_or(false)
795 }
796
797 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
798 self.accepted_tos_at
799 .map(|accepted_tos_at| accepted_tos_at.is_some())
800 }
801
802 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
803 if self.current_user().is_none() {
804 return Task::ready(Err(anyhow!("no current user")));
805 };
806
807 let client = self.client.clone();
808 cx.spawn(async move |this, cx| -> anyhow::Result<()> {
809 let client = client.upgrade().context("client not found")?;
810 let response = client
811 .request(proto::AcceptTermsOfService {})
812 .await
813 .context("error accepting tos")?;
814 this.update(cx, |this, cx| {
815 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
816 cx.emit(Event::PrivateUserInfoUpdated);
817 })?;
818 Ok(())
819 })
820 }
821
822 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
823 self.accepted_tos_at = Some(
824 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
825 );
826 }
827
828 fn load_users(
829 &self,
830 request: impl RequestMessage<Response = UsersResponse>,
831 cx: &Context<Self>,
832 ) -> Task<Result<Vec<Arc<User>>>> {
833 let client = self.client.clone();
834 cx.spawn(async move |this, cx| {
835 if let Some(rpc) = client.upgrade() {
836 let response = rpc.request(request).await.context("error loading users")?;
837 let users = response.users;
838
839 this.update(cx, |this, _| this.insert(users))
840 } else {
841 Ok(Vec::new())
842 }
843 })
844 }
845
846 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
847 let mut ret = Vec::with_capacity(users.len());
848 for user in users {
849 let user = User::new(user);
850 if let Some(old) = self.users.insert(user.id, user.clone()) {
851 if old.github_login != user.github_login {
852 self.by_github_login.remove(&old.github_login);
853 }
854 }
855 self.by_github_login
856 .insert(user.github_login.clone(), user.id);
857 ret.push(user)
858 }
859 ret
860 }
861
862 pub fn set_participant_indices(
863 &mut self,
864 participant_indices: HashMap<u64, ParticipantIndex>,
865 cx: &mut Context<Self>,
866 ) {
867 if participant_indices != self.participant_indices {
868 self.participant_indices = participant_indices;
869 cx.emit(Event::ParticipantIndicesChanged);
870 }
871 }
872
873 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
874 &self.participant_indices
875 }
876
877 pub fn participant_names(
878 &self,
879 user_ids: impl Iterator<Item = u64>,
880 cx: &App,
881 ) -> HashMap<u64, SharedString> {
882 let mut ret = HashMap::default();
883 let mut missing_user_ids = Vec::new();
884 for id in user_ids {
885 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
886 ret.insert(id, github_login);
887 } else {
888 missing_user_ids.push(id)
889 }
890 }
891 if !missing_user_ids.is_empty() {
892 let this = self.weak_self.clone();
893 cx.spawn(async move |cx| {
894 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
895 .await
896 })
897 .detach_and_log_err(cx);
898 }
899 ret
900 }
901}
902
903impl User {
904 fn new(message: proto::User) -> Arc<Self> {
905 Arc::new(User {
906 id: message.id,
907 github_login: message.github_login.into(),
908 avatar_uri: message.avatar_url.into(),
909 name: message.name,
910 })
911 }
912}
913
914impl Contact {
915 async fn from_proto(
916 contact: proto::Contact,
917 user_store: &Entity<UserStore>,
918 cx: &mut AsyncApp,
919 ) -> Result<Self> {
920 let user = user_store
921 .update(cx, |user_store, cx| {
922 user_store.get_user(contact.user_id, cx)
923 })?
924 .await?;
925 Ok(Self {
926 user,
927 online: contact.online,
928 busy: contact.busy,
929 })
930 }
931}
932
933impl Collaborator {
934 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
935 Ok(Self {
936 peer_id: message.peer_id.context("invalid peer id")?,
937 replica_id: message.replica_id as ReplicaId,
938 user_id: message.user_id as UserId,
939 is_host: message.is_host,
940 committer_name: message.committer_name,
941 committer_email: message.committer_email,
942 })
943 }
944}
945
946impl RequestUsage {
947 pub fn over_limit(&self) -> bool {
948 match self.limit {
949 UsageLimit::Limited(limit) => self.amount >= limit,
950 UsageLimit::Unlimited => false,
951 }
952 }
953
954 pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
955 let limit = match limit.variant? {
956 proto::usage_limit::Variant::Limited(limited) => {
957 UsageLimit::Limited(limited.limit as i32)
958 }
959 proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
960 };
961 Some(RequestUsage {
962 limit,
963 amount: amount as i32,
964 })
965 }
966
967 fn from_headers(
968 limit_name: &str,
969 amount_name: &str,
970 headers: &HeaderMap<HeaderValue>,
971 ) -> Result<Self> {
972 let limit = headers
973 .get(limit_name)
974 .with_context(|| format!("missing {limit_name:?} header"))?;
975 let limit = UsageLimit::from_str(limit.to_str()?)?;
976
977 let amount = headers
978 .get(amount_name)
979 .with_context(|| format!("missing {amount_name:?} header"))?;
980 let amount = amount.to_str()?.parse::<i32>()?;
981
982 Ok(Self { limit, amount })
983 }
984}
985
986impl ModelRequestUsage {
987 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
988 Ok(Self(RequestUsage::from_headers(
989 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
990 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
991 headers,
992 )?))
993 }
994}
995
996impl EditPredictionUsage {
997 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
998 Ok(Self(RequestUsage::from_headers(
999 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1000 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1001 headers,
1002 )?))
1003 }
1004}