1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use cloud_llm_client::{
5 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
6 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
7};
8use collections::{HashMap, HashSet, hash_map::Entry};
9use derive_more::Deref;
10use feature_flags::FeatureFlagAppExt;
11use futures::{Future, StreamExt, channel::mpsc};
12use gpui::{
13 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
14};
15use http_client::http::{HeaderMap, HeaderValue};
16use postage::{sink::Sink, watch};
17use rpc::proto::{RequestMessage, UsersResponse};
18use std::{
19 str::FromStr as _,
20 sync::{Arc, Weak},
21};
22use text::ReplicaId;
23use util::{TryFutureExt as _, maybe};
24
25pub type UserId = u64;
26
27#[derive(
28 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
29)]
30pub struct ChannelId(pub u64);
31
32impl std::fmt::Display for ChannelId {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 self.0.fmt(f)
35 }
36}
37
38#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
39pub struct ProjectId(pub u64);
40
41impl ProjectId {
42 pub fn to_proto(&self) -> u64 {
43 self.0
44 }
45}
46
47#[derive(
48 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
49)]
50pub struct DevServerProjectId(pub u64);
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub struct ParticipantIndex(pub u32);
54
55#[derive(Default, Debug)]
56pub struct User {
57 pub id: UserId,
58 pub github_login: SharedString,
59 pub avatar_uri: SharedUri,
60 pub name: Option<String>,
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
64pub struct Collaborator {
65 pub peer_id: proto::PeerId,
66 pub replica_id: ReplicaId,
67 pub user_id: UserId,
68 pub is_host: bool,
69 pub committer_name: Option<String>,
70 pub committer_email: Option<String>,
71}
72
73impl PartialOrd for User {
74 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
75 Some(self.cmp(other))
76 }
77}
78
79impl Ord for User {
80 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
81 self.github_login.cmp(&other.github_login)
82 }
83}
84
85impl PartialEq for User {
86 fn eq(&self, other: &Self) -> bool {
87 self.id == other.id && self.github_login == other.github_login
88 }
89}
90
91impl Eq for User {}
92
93#[derive(Debug, PartialEq)]
94pub struct Contact {
95 pub user: Arc<User>,
96 pub online: bool,
97 pub busy: bool,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ContactRequestStatus {
102 None,
103 RequestSent,
104 RequestReceived,
105 RequestAccepted,
106}
107
108pub struct UserStore {
109 users: HashMap<u64, Arc<User>>,
110 by_github_login: HashMap<SharedString, u64>,
111 participant_indices: HashMap<u64, ParticipantIndex>,
112 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
113 current_plan: Option<proto::Plan>,
114 subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
115 trial_started_at: Option<DateTime<Utc>>,
116 model_request_usage: Option<ModelRequestUsage>,
117 edit_prediction_usage: Option<EditPredictionUsage>,
118 is_usage_based_billing_enabled: Option<bool>,
119 account_too_young: Option<bool>,
120 has_overdue_invoices: Option<bool>,
121 current_user: watch::Receiver<Option<Arc<User>>>,
122 accepted_tos_at: Option<Option<DateTime<Utc>>>,
123 contacts: Vec<Arc<Contact>>,
124 incoming_contact_requests: Vec<Arc<User>>,
125 outgoing_contact_requests: Vec<Arc<User>>,
126 pending_contact_requests: HashMap<u64, usize>,
127 invite_info: Option<InviteInfo>,
128 client: Weak<Client>,
129 _maintain_contacts: Task<()>,
130 _maintain_current_user: Task<Result<()>>,
131 weak_self: WeakEntity<Self>,
132}
133
134#[derive(Clone)]
135pub struct InviteInfo {
136 pub count: u32,
137 pub url: Arc<str>,
138}
139
140pub enum Event {
141 Contact {
142 user: Arc<User>,
143 kind: ContactEventKind,
144 },
145 ShowContacts,
146 ParticipantIndicesChanged,
147 PrivateUserInfoUpdated,
148 PlanUpdated,
149}
150
151#[derive(Clone, Copy)]
152pub enum ContactEventKind {
153 Requested,
154 Accepted,
155 Cancelled,
156}
157
158impl EventEmitter<Event> for UserStore {}
159
160enum UpdateContacts {
161 Update(proto::UpdateContacts),
162 Wait(postage::barrier::Sender),
163 Clear(postage::barrier::Sender),
164}
165
166#[derive(Debug, Clone, Copy, Deref)]
167pub struct ModelRequestUsage(pub RequestUsage);
168
169#[derive(Debug, Clone, Copy, Deref)]
170pub struct EditPredictionUsage(pub RequestUsage);
171
172#[derive(Debug, Clone, Copy)]
173pub struct RequestUsage {
174 pub limit: UsageLimit,
175 pub amount: i32,
176}
177
178impl UserStore {
179 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
180 let (mut current_user_tx, current_user_rx) = watch::channel();
181 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
182 let rpc_subscriptions = vec![
183 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
184 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
185 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
186 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
187 ];
188 Self {
189 users: Default::default(),
190 by_github_login: Default::default(),
191 current_user: current_user_rx,
192 current_plan: None,
193 subscription_period: None,
194 trial_started_at: None,
195 model_request_usage: None,
196 edit_prediction_usage: None,
197 is_usage_based_billing_enabled: None,
198 account_too_young: None,
199 has_overdue_invoices: None,
200 accepted_tos_at: None,
201 contacts: Default::default(),
202 incoming_contact_requests: Default::default(),
203 participant_indices: Default::default(),
204 outgoing_contact_requests: Default::default(),
205 invite_info: None,
206 client: Arc::downgrade(&client),
207 update_contacts_tx,
208 _maintain_contacts: cx.spawn(async move |this, cx| {
209 let _subscriptions = rpc_subscriptions;
210 while let Some(message) = update_contacts_rx.next().await {
211 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
212 {
213 task.log_err().await;
214 } else {
215 break;
216 }
217 }
218 }),
219 _maintain_current_user: cx.spawn(async move |this, cx| {
220 let mut status = client.status();
221 let weak = Arc::downgrade(&client);
222 drop(client);
223 while let Some(status) = status.next().await {
224 // if the client is dropped, the app is shutting down.
225 let Some(client) = weak.upgrade() else {
226 return Ok(());
227 };
228 match status {
229 Status::Connected { .. } => {
230 if let Some(user_id) = client.user_id() {
231 let fetch_user = if let Ok(fetch_user) =
232 this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
233 {
234 fetch_user
235 } else {
236 break;
237 };
238 let fetch_private_user_info =
239 client.request(proto::GetPrivateUserInfo {}).log_err();
240 let (user, info) =
241 futures::join!(fetch_user, fetch_private_user_info);
242
243 cx.update(|cx| {
244 if let Some(info) = info {
245 let staff =
246 info.staff && !*feature_flags::ZED_DISABLE_STAFF;
247 cx.update_flags(staff, info.flags);
248 client.telemetry.set_authenticated_user_info(
249 Some(info.metrics_id.clone()),
250 staff,
251 );
252
253 this.update(cx, |this, cx| {
254 let accepted_tos_at = {
255 #[cfg(debug_assertions)]
256 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
257 {
258 None
259 } else {
260 info.accepted_tos_at
261 }
262
263 #[cfg(not(debug_assertions))]
264 info.accepted_tos_at
265 };
266
267 this.set_current_user_accepted_tos_at(accepted_tos_at);
268 cx.emit(Event::PrivateUserInfoUpdated);
269 })
270 } else {
271 anyhow::Ok(())
272 }
273 })??;
274
275 current_user_tx.send(user).await.ok();
276
277 this.update(cx, |_, cx| cx.notify())?;
278 }
279 }
280 Status::SignedOut => {
281 current_user_tx.send(None).await.ok();
282 this.update(cx, |this, cx| {
283 this.accepted_tos_at = None;
284 cx.emit(Event::PrivateUserInfoUpdated);
285 cx.notify();
286 this.clear_contacts()
287 })?
288 .await;
289 }
290 Status::ConnectionLost => {
291 this.update(cx, |this, cx| {
292 cx.notify();
293 this.clear_contacts()
294 })?
295 .await;
296 }
297 _ => {}
298 }
299 }
300 Ok(())
301 }),
302 pending_contact_requests: Default::default(),
303 weak_self: cx.weak_entity(),
304 }
305 }
306
307 #[cfg(feature = "test-support")]
308 pub fn clear_cache(&mut self) {
309 self.users.clear();
310 self.by_github_login.clear();
311 }
312
313 async fn handle_update_invite_info(
314 this: Entity<Self>,
315 message: TypedEnvelope<proto::UpdateInviteInfo>,
316 mut cx: AsyncApp,
317 ) -> Result<()> {
318 this.update(&mut cx, |this, cx| {
319 this.invite_info = Some(InviteInfo {
320 url: Arc::from(message.payload.url),
321 count: message.payload.count,
322 });
323 cx.notify();
324 })?;
325 Ok(())
326 }
327
328 async fn handle_show_contacts(
329 this: Entity<Self>,
330 _: TypedEnvelope<proto::ShowContacts>,
331 mut cx: AsyncApp,
332 ) -> Result<()> {
333 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
334 Ok(())
335 }
336
337 pub fn invite_info(&self) -> Option<&InviteInfo> {
338 self.invite_info.as_ref()
339 }
340
341 async fn handle_update_contacts(
342 this: Entity<Self>,
343 message: TypedEnvelope<proto::UpdateContacts>,
344 mut cx: AsyncApp,
345 ) -> Result<()> {
346 this.read_with(&mut cx, |this, _| {
347 this.update_contacts_tx
348 .unbounded_send(UpdateContacts::Update(message.payload))
349 .unwrap();
350 })?;
351 Ok(())
352 }
353
354 async fn handle_update_plan(
355 this: Entity<Self>,
356 message: TypedEnvelope<proto::UpdateUserPlan>,
357 mut cx: AsyncApp,
358 ) -> Result<()> {
359 this.update(&mut cx, |this, cx| {
360 this.current_plan = Some(message.payload.plan());
361 this.subscription_period = maybe!({
362 let period = message.payload.subscription_period?;
363 let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
364 let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
365
366 Some((started_at, ended_at))
367 });
368 this.trial_started_at = message
369 .payload
370 .trial_started_at
371 .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
372 this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
373 this.account_too_young = message.payload.account_too_young;
374 this.has_overdue_invoices = message.payload.has_overdue_invoices;
375
376 if let Some(usage) = message.payload.usage {
377 // limits are always present even though they are wrapped in Option
378 this.model_request_usage = usage
379 .model_requests_usage_limit
380 .and_then(|limit| {
381 RequestUsage::from_proto(usage.model_requests_usage_amount, limit)
382 })
383 .map(ModelRequestUsage);
384 this.edit_prediction_usage = usage
385 .edit_predictions_usage_limit
386 .and_then(|limit| {
387 RequestUsage::from_proto(usage.model_requests_usage_amount, limit)
388 })
389 .map(EditPredictionUsage);
390 }
391
392 cx.emit(Event::PlanUpdated);
393 cx.notify();
394 })?;
395 Ok(())
396 }
397
398 pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
399 self.model_request_usage = Some(usage);
400 cx.notify();
401 }
402
403 pub fn update_edit_prediction_usage(
404 &mut self,
405 usage: EditPredictionUsage,
406 cx: &mut Context<Self>,
407 ) {
408 self.edit_prediction_usage = Some(usage);
409 cx.notify();
410 }
411
412 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
413 match message {
414 UpdateContacts::Wait(barrier) => {
415 drop(barrier);
416 Task::ready(Ok(()))
417 }
418 UpdateContacts::Clear(barrier) => {
419 self.contacts.clear();
420 self.incoming_contact_requests.clear();
421 self.outgoing_contact_requests.clear();
422 drop(barrier);
423 Task::ready(Ok(()))
424 }
425 UpdateContacts::Update(message) => {
426 let mut user_ids = HashSet::default();
427 for contact in &message.contacts {
428 user_ids.insert(contact.user_id);
429 }
430 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
431 user_ids.extend(message.outgoing_requests.iter());
432
433 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
434 cx.spawn(async move |this, cx| {
435 load_users.await?;
436
437 // Users are fetched in parallel above and cached in call to get_users
438 // No need to parallelize here
439 let mut updated_contacts = Vec::new();
440 let this = this.upgrade().context("can't upgrade user store handle")?;
441 for contact in message.contacts {
442 updated_contacts
443 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
444 }
445
446 let mut incoming_requests = Vec::new();
447 for request in message.incoming_requests {
448 incoming_requests.push({
449 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
450 .await?
451 });
452 }
453
454 let mut outgoing_requests = Vec::new();
455 for requested_user_id in message.outgoing_requests {
456 outgoing_requests.push(
457 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
458 .await?,
459 );
460 }
461
462 let removed_contacts =
463 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
464 let removed_incoming_requests =
465 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
466 let removed_outgoing_requests =
467 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
468
469 this.update(cx, |this, cx| {
470 // Remove contacts
471 this.contacts
472 .retain(|contact| !removed_contacts.contains(&contact.user.id));
473 // Update existing contacts and insert new ones
474 for updated_contact in updated_contacts {
475 match this.contacts.binary_search_by_key(
476 &&updated_contact.user.github_login,
477 |contact| &contact.user.github_login,
478 ) {
479 Ok(ix) => this.contacts[ix] = updated_contact,
480 Err(ix) => this.contacts.insert(ix, updated_contact),
481 }
482 }
483
484 // Remove incoming contact requests
485 this.incoming_contact_requests.retain(|user| {
486 if removed_incoming_requests.contains(&user.id) {
487 cx.emit(Event::Contact {
488 user: user.clone(),
489 kind: ContactEventKind::Cancelled,
490 });
491 false
492 } else {
493 true
494 }
495 });
496 // Update existing incoming requests and insert new ones
497 for user in incoming_requests {
498 match this
499 .incoming_contact_requests
500 .binary_search_by_key(&&user.github_login, |contact| {
501 &contact.github_login
502 }) {
503 Ok(ix) => this.incoming_contact_requests[ix] = user,
504 Err(ix) => this.incoming_contact_requests.insert(ix, user),
505 }
506 }
507
508 // Remove outgoing contact requests
509 this.outgoing_contact_requests
510 .retain(|user| !removed_outgoing_requests.contains(&user.id));
511 // Update existing incoming requests and insert new ones
512 for request in outgoing_requests {
513 match this
514 .outgoing_contact_requests
515 .binary_search_by_key(&&request.github_login, |contact| {
516 &contact.github_login
517 }) {
518 Ok(ix) => this.outgoing_contact_requests[ix] = request,
519 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
520 }
521 }
522
523 cx.notify();
524 })?;
525
526 Ok(())
527 })
528 }
529 }
530 }
531
532 pub fn contacts(&self) -> &[Arc<Contact>] {
533 &self.contacts
534 }
535
536 pub fn has_contact(&self, user: &Arc<User>) -> bool {
537 self.contacts
538 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
539 .is_ok()
540 }
541
542 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
543 &self.incoming_contact_requests
544 }
545
546 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
547 &self.outgoing_contact_requests
548 }
549
550 pub fn is_contact_request_pending(&self, user: &User) -> bool {
551 self.pending_contact_requests.contains_key(&user.id)
552 }
553
554 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
555 if self
556 .contacts
557 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
558 .is_ok()
559 {
560 ContactRequestStatus::RequestAccepted
561 } else if self
562 .outgoing_contact_requests
563 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
564 .is_ok()
565 {
566 ContactRequestStatus::RequestSent
567 } else if self
568 .incoming_contact_requests
569 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
570 .is_ok()
571 {
572 ContactRequestStatus::RequestReceived
573 } else {
574 ContactRequestStatus::None
575 }
576 }
577
578 pub fn request_contact(
579 &mut self,
580 responder_id: u64,
581 cx: &mut Context<Self>,
582 ) -> Task<Result<()>> {
583 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
584 }
585
586 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
587 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
588 }
589
590 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
591 self.incoming_contact_requests
592 .iter()
593 .any(|user| user.id == user_id)
594 }
595
596 pub fn respond_to_contact_request(
597 &mut self,
598 requester_id: u64,
599 accept: bool,
600 cx: &mut Context<Self>,
601 ) -> Task<Result<()>> {
602 self.perform_contact_request(
603 requester_id,
604 proto::RespondToContactRequest {
605 requester_id,
606 response: if accept {
607 proto::ContactRequestResponse::Accept
608 } else {
609 proto::ContactRequestResponse::Decline
610 } as i32,
611 },
612 cx,
613 )
614 }
615
616 pub fn dismiss_contact_request(
617 &self,
618 requester_id: u64,
619 cx: &Context<Self>,
620 ) -> Task<Result<()>> {
621 let client = self.client.upgrade();
622 cx.spawn(async move |_, _| {
623 client
624 .context("can't upgrade client reference")?
625 .request(proto::RespondToContactRequest {
626 requester_id,
627 response: proto::ContactRequestResponse::Dismiss as i32,
628 })
629 .await?;
630 Ok(())
631 })
632 }
633
634 fn perform_contact_request<T: RequestMessage>(
635 &mut self,
636 user_id: u64,
637 request: T,
638 cx: &mut Context<Self>,
639 ) -> Task<Result<()>> {
640 let client = self.client.upgrade();
641 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
642 cx.notify();
643
644 cx.spawn(async move |this, cx| {
645 let response = client
646 .context("can't upgrade client reference")?
647 .request(request)
648 .await;
649 this.update(cx, |this, cx| {
650 if let Entry::Occupied(mut request_count) =
651 this.pending_contact_requests.entry(user_id)
652 {
653 *request_count.get_mut() -= 1;
654 if *request_count.get() == 0 {
655 request_count.remove();
656 }
657 }
658 cx.notify();
659 })?;
660 response?;
661 Ok(())
662 })
663 }
664
665 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
666 let (tx, mut rx) = postage::barrier::channel();
667 self.update_contacts_tx
668 .unbounded_send(UpdateContacts::Clear(tx))
669 .unwrap();
670 async move {
671 rx.next().await;
672 }
673 }
674
675 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
676 let (tx, mut rx) = postage::barrier::channel();
677 self.update_contacts_tx
678 .unbounded_send(UpdateContacts::Wait(tx))
679 .unwrap();
680 async move {
681 rx.next().await;
682 }
683 }
684
685 pub fn get_users(
686 &self,
687 user_ids: Vec<u64>,
688 cx: &Context<Self>,
689 ) -> Task<Result<Vec<Arc<User>>>> {
690 let mut user_ids_to_fetch = user_ids.clone();
691 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
692
693 cx.spawn(async move |this, cx| {
694 if !user_ids_to_fetch.is_empty() {
695 this.update(cx, |this, cx| {
696 this.load_users(
697 proto::GetUsers {
698 user_ids: user_ids_to_fetch,
699 },
700 cx,
701 )
702 })?
703 .await?;
704 }
705
706 this.read_with(cx, |this, _| {
707 user_ids
708 .iter()
709 .map(|user_id| {
710 this.users
711 .get(user_id)
712 .cloned()
713 .with_context(|| format!("user {user_id} not found"))
714 })
715 .collect()
716 })?
717 })
718 }
719
720 pub fn fuzzy_search_users(
721 &self,
722 query: String,
723 cx: &Context<Self>,
724 ) -> Task<Result<Vec<Arc<User>>>> {
725 self.load_users(proto::FuzzySearchUsers { query }, cx)
726 }
727
728 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
729 self.users.get(&user_id).cloned()
730 }
731
732 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
733 if let Some(user) = self.users.get(&user_id).cloned() {
734 return Some(user);
735 }
736
737 self.get_user(user_id, cx).detach_and_log_err(cx);
738 None
739 }
740
741 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
742 if let Some(user) = self.users.get(&user_id).cloned() {
743 return Task::ready(Ok(user));
744 }
745
746 let load_users = self.get_users(vec![user_id], cx);
747 cx.spawn(async move |this, cx| {
748 load_users.await?;
749 this.read_with(cx, |this, _| {
750 this.users
751 .get(&user_id)
752 .cloned()
753 .context("server responded with no users")
754 })?
755 })
756 }
757
758 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
759 self.by_github_login
760 .get(github_login)
761 .and_then(|id| self.users.get(id).cloned())
762 }
763
764 pub fn current_user(&self) -> Option<Arc<User>> {
765 self.current_user.borrow().clone()
766 }
767
768 pub fn current_plan(&self) -> Option<proto::Plan> {
769 #[cfg(debug_assertions)]
770 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
771 return match plan.as_str() {
772 "free" => Some(proto::Plan::Free),
773 "trial" => Some(proto::Plan::ZedProTrial),
774 "pro" => Some(proto::Plan::ZedPro),
775 _ => {
776 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
777 }
778 };
779 }
780
781 self.current_plan
782 }
783
784 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
785 self.subscription_period
786 }
787
788 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
789 self.trial_started_at
790 }
791
792 pub fn usage_based_billing_enabled(&self) -> Option<bool> {
793 self.is_usage_based_billing_enabled
794 }
795
796 pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
797 self.model_request_usage
798 }
799
800 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
801 self.edit_prediction_usage
802 }
803
804 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
805 self.current_user.clone()
806 }
807
808 /// Returns whether the user's account is too new to use the service.
809 pub fn account_too_young(&self) -> bool {
810 self.account_too_young.unwrap_or(false)
811 }
812
813 /// Returns whether the current user has overdue invoices and usage should be blocked.
814 pub fn has_overdue_invoices(&self) -> bool {
815 self.has_overdue_invoices.unwrap_or(false)
816 }
817
818 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
819 self.accepted_tos_at
820 .map(|accepted_tos_at| accepted_tos_at.is_some())
821 }
822
823 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
824 if self.current_user().is_none() {
825 return Task::ready(Err(anyhow!("no current user")));
826 };
827
828 let client = self.client.clone();
829 cx.spawn(async move |this, cx| -> anyhow::Result<()> {
830 let client = client.upgrade().context("client not found")?;
831 let response = client
832 .request(proto::AcceptTermsOfService {})
833 .await
834 .context("error accepting tos")?;
835 this.update(cx, |this, cx| {
836 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
837 cx.emit(Event::PrivateUserInfoUpdated);
838 })?;
839 Ok(())
840 })
841 }
842
843 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
844 self.accepted_tos_at = Some(
845 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
846 );
847 }
848
849 fn load_users(
850 &self,
851 request: impl RequestMessage<Response = UsersResponse>,
852 cx: &Context<Self>,
853 ) -> Task<Result<Vec<Arc<User>>>> {
854 let client = self.client.clone();
855 cx.spawn(async move |this, cx| {
856 if let Some(rpc) = client.upgrade() {
857 let response = rpc.request(request).await.context("error loading users")?;
858 let users = response.users;
859
860 this.update(cx, |this, _| this.insert(users))
861 } else {
862 Ok(Vec::new())
863 }
864 })
865 }
866
867 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
868 let mut ret = Vec::with_capacity(users.len());
869 for user in users {
870 let user = User::new(user);
871 if let Some(old) = self.users.insert(user.id, user.clone()) {
872 if old.github_login != user.github_login {
873 self.by_github_login.remove(&old.github_login);
874 }
875 }
876 self.by_github_login
877 .insert(user.github_login.clone(), user.id);
878 ret.push(user)
879 }
880 ret
881 }
882
883 pub fn set_participant_indices(
884 &mut self,
885 participant_indices: HashMap<u64, ParticipantIndex>,
886 cx: &mut Context<Self>,
887 ) {
888 if participant_indices != self.participant_indices {
889 self.participant_indices = participant_indices;
890 cx.emit(Event::ParticipantIndicesChanged);
891 }
892 }
893
894 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
895 &self.participant_indices
896 }
897
898 pub fn participant_names(
899 &self,
900 user_ids: impl Iterator<Item = u64>,
901 cx: &App,
902 ) -> HashMap<u64, SharedString> {
903 let mut ret = HashMap::default();
904 let mut missing_user_ids = Vec::new();
905 for id in user_ids {
906 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
907 ret.insert(id, github_login);
908 } else {
909 missing_user_ids.push(id)
910 }
911 }
912 if !missing_user_ids.is_empty() {
913 let this = self.weak_self.clone();
914 cx.spawn(async move |cx| {
915 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
916 .await
917 })
918 .detach_and_log_err(cx);
919 }
920 ret
921 }
922}
923
924impl User {
925 fn new(message: proto::User) -> Arc<Self> {
926 Arc::new(User {
927 id: message.id,
928 github_login: message.github_login.into(),
929 avatar_uri: message.avatar_url.into(),
930 name: message.name,
931 })
932 }
933}
934
935impl Contact {
936 async fn from_proto(
937 contact: proto::Contact,
938 user_store: &Entity<UserStore>,
939 cx: &mut AsyncApp,
940 ) -> Result<Self> {
941 let user = user_store
942 .update(cx, |user_store, cx| {
943 user_store.get_user(contact.user_id, cx)
944 })?
945 .await?;
946 Ok(Self {
947 user,
948 online: contact.online,
949 busy: contact.busy,
950 })
951 }
952}
953
954impl Collaborator {
955 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
956 Ok(Self {
957 peer_id: message.peer_id.context("invalid peer id")?,
958 replica_id: message.replica_id as ReplicaId,
959 user_id: message.user_id as UserId,
960 is_host: message.is_host,
961 committer_name: message.committer_name,
962 committer_email: message.committer_email,
963 })
964 }
965}
966
967impl RequestUsage {
968 pub fn over_limit(&self) -> bool {
969 match self.limit {
970 UsageLimit::Limited(limit) => self.amount >= limit,
971 UsageLimit::Unlimited => false,
972 }
973 }
974
975 pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
976 let limit = match limit.variant? {
977 proto::usage_limit::Variant::Limited(limited) => {
978 UsageLimit::Limited(limited.limit as i32)
979 }
980 proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
981 };
982 Some(RequestUsage {
983 limit,
984 amount: amount as i32,
985 })
986 }
987
988 fn from_headers(
989 limit_name: &str,
990 amount_name: &str,
991 headers: &HeaderMap<HeaderValue>,
992 ) -> Result<Self> {
993 let limit = headers
994 .get(limit_name)
995 .with_context(|| format!("missing {limit_name:?} header"))?;
996 let limit = UsageLimit::from_str(limit.to_str()?)?;
997
998 let amount = headers
999 .get(amount_name)
1000 .with_context(|| format!("missing {amount_name:?} header"))?;
1001 let amount = amount.to_str()?.parse::<i32>()?;
1002
1003 Ok(Self { limit, amount })
1004 }
1005}
1006
1007impl ModelRequestUsage {
1008 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1009 Ok(Self(RequestUsage::from_headers(
1010 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
1011 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
1012 headers,
1013 )?))
1014 }
1015}
1016
1017impl EditPredictionUsage {
1018 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1019 Ok(Self(RequestUsage::from_headers(
1020 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1021 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1022 headers,
1023 )?))
1024 }
1025}