1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use collections::{HashMap, HashSet, hash_map::Entry};
5use derive_more::Deref;
6use feature_flags::FeatureFlagAppExt;
7use futures::{Future, StreamExt, channel::mpsc};
8use gpui::{
9 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
10};
11use http_client::http::{HeaderMap, HeaderValue};
12use postage::{sink::Sink, watch};
13use rpc::proto::{RequestMessage, UsersResponse};
14use std::{
15 str::FromStr as _,
16 sync::{Arc, Weak},
17};
18use text::ReplicaId;
19use util::{TryFutureExt as _, maybe};
20use zed_llm_client::{
21 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
22 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
23};
24
25pub type UserId = u64;
26
27#[derive(
28 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
29)]
30pub struct ChannelId(pub u64);
31
32impl std::fmt::Display for ChannelId {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 self.0.fmt(f)
35 }
36}
37
38#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
39pub struct ProjectId(pub u64);
40
41impl ProjectId {
42 pub fn to_proto(&self) -> u64 {
43 self.0
44 }
45}
46
47#[derive(
48 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
49)]
50pub struct DevServerProjectId(pub u64);
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub struct ParticipantIndex(pub u32);
54
55#[derive(Default, Debug)]
56pub struct User {
57 pub id: UserId,
58 pub github_login: String,
59 pub avatar_uri: SharedUri,
60 pub name: Option<String>,
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
64pub struct Collaborator {
65 pub peer_id: proto::PeerId,
66 pub replica_id: ReplicaId,
67 pub user_id: UserId,
68 pub is_host: bool,
69 pub committer_name: Option<String>,
70 pub committer_email: Option<String>,
71}
72
73impl PartialOrd for User {
74 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
75 Some(self.cmp(other))
76 }
77}
78
79impl Ord for User {
80 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
81 self.github_login.cmp(&other.github_login)
82 }
83}
84
85impl PartialEq for User {
86 fn eq(&self, other: &Self) -> bool {
87 self.id == other.id && self.github_login == other.github_login
88 }
89}
90
91impl Eq for User {}
92
93#[derive(Debug, PartialEq)]
94pub struct Contact {
95 pub user: Arc<User>,
96 pub online: bool,
97 pub busy: bool,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ContactRequestStatus {
102 None,
103 RequestSent,
104 RequestReceived,
105 RequestAccepted,
106}
107
108pub struct UserStore {
109 users: HashMap<u64, Arc<User>>,
110 by_github_login: HashMap<String, u64>,
111 participant_indices: HashMap<u64, ParticipantIndex>,
112 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
113 current_plan: Option<proto::Plan>,
114 subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
115 trial_started_at: Option<DateTime<Utc>>,
116 model_request_usage: Option<ModelRequestUsage>,
117 edit_prediction_usage: Option<EditPredictionUsage>,
118 is_usage_based_billing_enabled: Option<bool>,
119 account_too_young: Option<bool>,
120 has_overdue_invoices: Option<bool>,
121 current_user: watch::Receiver<Option<Arc<User>>>,
122 accepted_tos_at: Option<Option<DateTime<Utc>>>,
123 contacts: Vec<Arc<Contact>>,
124 incoming_contact_requests: Vec<Arc<User>>,
125 outgoing_contact_requests: Vec<Arc<User>>,
126 pending_contact_requests: HashMap<u64, usize>,
127 invite_info: Option<InviteInfo>,
128 client: Weak<Client>,
129 _maintain_contacts: Task<()>,
130 _maintain_current_user: Task<Result<()>>,
131 weak_self: WeakEntity<Self>,
132}
133
134#[derive(Clone)]
135pub struct InviteInfo {
136 pub count: u32,
137 pub url: Arc<str>,
138}
139
140pub enum Event {
141 Contact {
142 user: Arc<User>,
143 kind: ContactEventKind,
144 },
145 ShowContacts,
146 ParticipantIndicesChanged,
147 PrivateUserInfoUpdated,
148}
149
150#[derive(Clone, Copy)]
151pub enum ContactEventKind {
152 Requested,
153 Accepted,
154 Cancelled,
155}
156
157impl EventEmitter<Event> for UserStore {}
158
159enum UpdateContacts {
160 Update(proto::UpdateContacts),
161 Wait(postage::barrier::Sender),
162 Clear(postage::barrier::Sender),
163}
164
165#[derive(Debug, Clone, Copy, Deref)]
166pub struct ModelRequestUsage(pub RequestUsage);
167
168#[derive(Debug, Clone, Copy, Deref)]
169pub struct EditPredictionUsage(pub RequestUsage);
170
171#[derive(Debug, Clone, Copy)]
172pub struct RequestUsage {
173 pub limit: UsageLimit,
174 pub amount: i32,
175}
176
177impl UserStore {
178 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
179 let (mut current_user_tx, current_user_rx) = watch::channel();
180 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
181 let rpc_subscriptions = vec![
182 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
183 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
184 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
185 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
186 ];
187 Self {
188 users: Default::default(),
189 by_github_login: Default::default(),
190 current_user: current_user_rx,
191 current_plan: None,
192 subscription_period: None,
193 trial_started_at: None,
194 model_request_usage: None,
195 edit_prediction_usage: None,
196 is_usage_based_billing_enabled: None,
197 account_too_young: None,
198 has_overdue_invoices: None,
199 accepted_tos_at: None,
200 contacts: Default::default(),
201 incoming_contact_requests: Default::default(),
202 participant_indices: Default::default(),
203 outgoing_contact_requests: Default::default(),
204 invite_info: None,
205 client: Arc::downgrade(&client),
206 update_contacts_tx,
207 _maintain_contacts: cx.spawn(async move |this, cx| {
208 let _subscriptions = rpc_subscriptions;
209 while let Some(message) = update_contacts_rx.next().await {
210 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
211 {
212 task.log_err().await;
213 } else {
214 break;
215 }
216 }
217 }),
218 _maintain_current_user: cx.spawn(async move |this, cx| {
219 let mut status = client.status();
220 let weak = Arc::downgrade(&client);
221 drop(client);
222 while let Some(status) = status.next().await {
223 // if the client is dropped, the app is shutting down.
224 let Some(client) = weak.upgrade() else {
225 return Ok(());
226 };
227 match status {
228 Status::Connected { .. } => {
229 if let Some(user_id) = client.user_id() {
230 let fetch_user = if let Ok(fetch_user) =
231 this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
232 {
233 fetch_user
234 } else {
235 break;
236 };
237 let fetch_private_user_info =
238 client.request(proto::GetPrivateUserInfo {}).log_err();
239 let (user, info) =
240 futures::join!(fetch_user, fetch_private_user_info);
241
242 cx.update(|cx| {
243 if let Some(info) = info {
244 let staff =
245 info.staff && !*feature_flags::ZED_DISABLE_STAFF;
246 cx.update_flags(staff, info.flags);
247 client.telemetry.set_authenticated_user_info(
248 Some(info.metrics_id.clone()),
249 staff,
250 );
251
252 this.update(cx, |this, cx| {
253 let accepted_tos_at = {
254 #[cfg(debug_assertions)]
255 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
256 {
257 None
258 } else {
259 info.accepted_tos_at
260 }
261
262 #[cfg(not(debug_assertions))]
263 info.accepted_tos_at
264 };
265
266 this.set_current_user_accepted_tos_at(accepted_tos_at);
267 cx.emit(Event::PrivateUserInfoUpdated);
268 })
269 } else {
270 anyhow::Ok(())
271 }
272 })??;
273
274 current_user_tx.send(user).await.ok();
275
276 this.update(cx, |_, cx| cx.notify())?;
277 }
278 }
279 Status::SignedOut => {
280 current_user_tx.send(None).await.ok();
281 this.update(cx, |this, cx| {
282 this.accepted_tos_at = None;
283 cx.emit(Event::PrivateUserInfoUpdated);
284 cx.notify();
285 this.clear_contacts()
286 })?
287 .await;
288 }
289 Status::ConnectionLost => {
290 this.update(cx, |this, cx| {
291 cx.notify();
292 this.clear_contacts()
293 })?
294 .await;
295 }
296 _ => {}
297 }
298 }
299 Ok(())
300 }),
301 pending_contact_requests: Default::default(),
302 weak_self: cx.weak_entity(),
303 }
304 }
305
306 #[cfg(feature = "test-support")]
307 pub fn clear_cache(&mut self) {
308 self.users.clear();
309 self.by_github_login.clear();
310 }
311
312 async fn handle_update_invite_info(
313 this: Entity<Self>,
314 message: TypedEnvelope<proto::UpdateInviteInfo>,
315 mut cx: AsyncApp,
316 ) -> Result<()> {
317 this.update(&mut cx, |this, cx| {
318 this.invite_info = Some(InviteInfo {
319 url: Arc::from(message.payload.url),
320 count: message.payload.count,
321 });
322 cx.notify();
323 })?;
324 Ok(())
325 }
326
327 async fn handle_show_contacts(
328 this: Entity<Self>,
329 _: TypedEnvelope<proto::ShowContacts>,
330 mut cx: AsyncApp,
331 ) -> Result<()> {
332 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
333 Ok(())
334 }
335
336 pub fn invite_info(&self) -> Option<&InviteInfo> {
337 self.invite_info.as_ref()
338 }
339
340 async fn handle_update_contacts(
341 this: Entity<Self>,
342 message: TypedEnvelope<proto::UpdateContacts>,
343 mut cx: AsyncApp,
344 ) -> Result<()> {
345 this.read_with(&mut cx, |this, _| {
346 this.update_contacts_tx
347 .unbounded_send(UpdateContacts::Update(message.payload))
348 .unwrap();
349 })?;
350 Ok(())
351 }
352
353 async fn handle_update_plan(
354 this: Entity<Self>,
355 message: TypedEnvelope<proto::UpdateUserPlan>,
356 mut cx: AsyncApp,
357 ) -> Result<()> {
358 this.update(&mut cx, |this, cx| {
359 this.current_plan = Some(message.payload.plan());
360 this.subscription_period = maybe!({
361 let period = message.payload.subscription_period?;
362 let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
363 let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
364
365 Some((started_at, ended_at))
366 });
367 this.trial_started_at = message
368 .payload
369 .trial_started_at
370 .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
371 this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
372 this.account_too_young = message.payload.account_too_young;
373 this.has_overdue_invoices = message.payload.has_overdue_invoices;
374
375 if let Some(usage) = message.payload.usage {
376 // limits are always present even though they are wrapped in Option
377 this.model_request_usage = usage
378 .model_requests_usage_limit
379 .and_then(|limit| {
380 RequestUsage::from_proto(usage.model_requests_usage_amount, limit)
381 })
382 .map(ModelRequestUsage);
383 this.edit_prediction_usage = usage
384 .edit_predictions_usage_limit
385 .and_then(|limit| {
386 RequestUsage::from_proto(usage.model_requests_usage_amount, limit)
387 })
388 .map(EditPredictionUsage);
389 }
390
391 cx.notify();
392 })?;
393 Ok(())
394 }
395
396 pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
397 self.model_request_usage = Some(usage);
398 cx.notify();
399 }
400
401 pub fn update_edit_prediction_usage(
402 &mut self,
403 usage: EditPredictionUsage,
404 cx: &mut Context<Self>,
405 ) {
406 self.edit_prediction_usage = Some(usage);
407 cx.notify();
408 }
409
410 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
411 match message {
412 UpdateContacts::Wait(barrier) => {
413 drop(barrier);
414 Task::ready(Ok(()))
415 }
416 UpdateContacts::Clear(barrier) => {
417 self.contacts.clear();
418 self.incoming_contact_requests.clear();
419 self.outgoing_contact_requests.clear();
420 drop(barrier);
421 Task::ready(Ok(()))
422 }
423 UpdateContacts::Update(message) => {
424 let mut user_ids = HashSet::default();
425 for contact in &message.contacts {
426 user_ids.insert(contact.user_id);
427 }
428 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
429 user_ids.extend(message.outgoing_requests.iter());
430
431 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
432 cx.spawn(async move |this, cx| {
433 load_users.await?;
434
435 // Users are fetched in parallel above and cached in call to get_users
436 // No need to parallelize here
437 let mut updated_contacts = Vec::new();
438 let this = this.upgrade().context("can't upgrade user store handle")?;
439 for contact in message.contacts {
440 updated_contacts
441 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
442 }
443
444 let mut incoming_requests = Vec::new();
445 for request in message.incoming_requests {
446 incoming_requests.push({
447 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
448 .await?
449 });
450 }
451
452 let mut outgoing_requests = Vec::new();
453 for requested_user_id in message.outgoing_requests {
454 outgoing_requests.push(
455 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
456 .await?,
457 );
458 }
459
460 let removed_contacts =
461 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
462 let removed_incoming_requests =
463 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
464 let removed_outgoing_requests =
465 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
466
467 this.update(cx, |this, cx| {
468 // Remove contacts
469 this.contacts
470 .retain(|contact| !removed_contacts.contains(&contact.user.id));
471 // Update existing contacts and insert new ones
472 for updated_contact in updated_contacts {
473 match this.contacts.binary_search_by_key(
474 &&updated_contact.user.github_login,
475 |contact| &contact.user.github_login,
476 ) {
477 Ok(ix) => this.contacts[ix] = updated_contact,
478 Err(ix) => this.contacts.insert(ix, updated_contact),
479 }
480 }
481
482 // Remove incoming contact requests
483 this.incoming_contact_requests.retain(|user| {
484 if removed_incoming_requests.contains(&user.id) {
485 cx.emit(Event::Contact {
486 user: user.clone(),
487 kind: ContactEventKind::Cancelled,
488 });
489 false
490 } else {
491 true
492 }
493 });
494 // Update existing incoming requests and insert new ones
495 for user in incoming_requests {
496 match this
497 .incoming_contact_requests
498 .binary_search_by_key(&&user.github_login, |contact| {
499 &contact.github_login
500 }) {
501 Ok(ix) => this.incoming_contact_requests[ix] = user,
502 Err(ix) => this.incoming_contact_requests.insert(ix, user),
503 }
504 }
505
506 // Remove outgoing contact requests
507 this.outgoing_contact_requests
508 .retain(|user| !removed_outgoing_requests.contains(&user.id));
509 // Update existing incoming requests and insert new ones
510 for request in outgoing_requests {
511 match this
512 .outgoing_contact_requests
513 .binary_search_by_key(&&request.github_login, |contact| {
514 &contact.github_login
515 }) {
516 Ok(ix) => this.outgoing_contact_requests[ix] = request,
517 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
518 }
519 }
520
521 cx.notify();
522 })?;
523
524 Ok(())
525 })
526 }
527 }
528 }
529
530 pub fn contacts(&self) -> &[Arc<Contact>] {
531 &self.contacts
532 }
533
534 pub fn has_contact(&self, user: &Arc<User>) -> bool {
535 self.contacts
536 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
537 .is_ok()
538 }
539
540 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
541 &self.incoming_contact_requests
542 }
543
544 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
545 &self.outgoing_contact_requests
546 }
547
548 pub fn is_contact_request_pending(&self, user: &User) -> bool {
549 self.pending_contact_requests.contains_key(&user.id)
550 }
551
552 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
553 if self
554 .contacts
555 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
556 .is_ok()
557 {
558 ContactRequestStatus::RequestAccepted
559 } else if self
560 .outgoing_contact_requests
561 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
562 .is_ok()
563 {
564 ContactRequestStatus::RequestSent
565 } else if self
566 .incoming_contact_requests
567 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
568 .is_ok()
569 {
570 ContactRequestStatus::RequestReceived
571 } else {
572 ContactRequestStatus::None
573 }
574 }
575
576 pub fn request_contact(
577 &mut self,
578 responder_id: u64,
579 cx: &mut Context<Self>,
580 ) -> Task<Result<()>> {
581 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
582 }
583
584 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
585 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
586 }
587
588 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
589 self.incoming_contact_requests
590 .iter()
591 .any(|user| user.id == user_id)
592 }
593
594 pub fn respond_to_contact_request(
595 &mut self,
596 requester_id: u64,
597 accept: bool,
598 cx: &mut Context<Self>,
599 ) -> Task<Result<()>> {
600 self.perform_contact_request(
601 requester_id,
602 proto::RespondToContactRequest {
603 requester_id,
604 response: if accept {
605 proto::ContactRequestResponse::Accept
606 } else {
607 proto::ContactRequestResponse::Decline
608 } as i32,
609 },
610 cx,
611 )
612 }
613
614 pub fn dismiss_contact_request(
615 &self,
616 requester_id: u64,
617 cx: &Context<Self>,
618 ) -> Task<Result<()>> {
619 let client = self.client.upgrade();
620 cx.spawn(async move |_, _| {
621 client
622 .context("can't upgrade client reference")?
623 .request(proto::RespondToContactRequest {
624 requester_id,
625 response: proto::ContactRequestResponse::Dismiss as i32,
626 })
627 .await?;
628 Ok(())
629 })
630 }
631
632 fn perform_contact_request<T: RequestMessage>(
633 &mut self,
634 user_id: u64,
635 request: T,
636 cx: &mut Context<Self>,
637 ) -> Task<Result<()>> {
638 let client = self.client.upgrade();
639 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
640 cx.notify();
641
642 cx.spawn(async move |this, cx| {
643 let response = client
644 .context("can't upgrade client reference")?
645 .request(request)
646 .await;
647 this.update(cx, |this, cx| {
648 if let Entry::Occupied(mut request_count) =
649 this.pending_contact_requests.entry(user_id)
650 {
651 *request_count.get_mut() -= 1;
652 if *request_count.get() == 0 {
653 request_count.remove();
654 }
655 }
656 cx.notify();
657 })?;
658 response?;
659 Ok(())
660 })
661 }
662
663 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
664 let (tx, mut rx) = postage::barrier::channel();
665 self.update_contacts_tx
666 .unbounded_send(UpdateContacts::Clear(tx))
667 .unwrap();
668 async move {
669 rx.next().await;
670 }
671 }
672
673 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
674 let (tx, mut rx) = postage::barrier::channel();
675 self.update_contacts_tx
676 .unbounded_send(UpdateContacts::Wait(tx))
677 .unwrap();
678 async move {
679 rx.next().await;
680 }
681 }
682
683 pub fn get_users(
684 &self,
685 user_ids: Vec<u64>,
686 cx: &Context<Self>,
687 ) -> Task<Result<Vec<Arc<User>>>> {
688 let mut user_ids_to_fetch = user_ids.clone();
689 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
690
691 cx.spawn(async move |this, cx| {
692 if !user_ids_to_fetch.is_empty() {
693 this.update(cx, |this, cx| {
694 this.load_users(
695 proto::GetUsers {
696 user_ids: user_ids_to_fetch,
697 },
698 cx,
699 )
700 })?
701 .await?;
702 }
703
704 this.read_with(cx, |this, _| {
705 user_ids
706 .iter()
707 .map(|user_id| {
708 this.users
709 .get(user_id)
710 .cloned()
711 .with_context(|| format!("user {user_id} not found"))
712 })
713 .collect()
714 })?
715 })
716 }
717
718 pub fn fuzzy_search_users(
719 &self,
720 query: String,
721 cx: &Context<Self>,
722 ) -> Task<Result<Vec<Arc<User>>>> {
723 self.load_users(proto::FuzzySearchUsers { query }, cx)
724 }
725
726 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
727 self.users.get(&user_id).cloned()
728 }
729
730 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
731 if let Some(user) = self.users.get(&user_id).cloned() {
732 return Some(user);
733 }
734
735 self.get_user(user_id, cx).detach_and_log_err(cx);
736 None
737 }
738
739 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
740 if let Some(user) = self.users.get(&user_id).cloned() {
741 return Task::ready(Ok(user));
742 }
743
744 let load_users = self.get_users(vec![user_id], cx);
745 cx.spawn(async move |this, cx| {
746 load_users.await?;
747 this.read_with(cx, |this, _| {
748 this.users
749 .get(&user_id)
750 .cloned()
751 .context("server responded with no users")
752 })?
753 })
754 }
755
756 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
757 self.by_github_login
758 .get(github_login)
759 .and_then(|id| self.users.get(id).cloned())
760 }
761
762 pub fn current_user(&self) -> Option<Arc<User>> {
763 self.current_user.borrow().clone()
764 }
765
766 pub fn current_plan(&self) -> Option<proto::Plan> {
767 #[cfg(debug_assertions)]
768 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
769 return match plan.as_str() {
770 "free" => Some(proto::Plan::Free),
771 "trial" => Some(proto::Plan::ZedProTrial),
772 "pro" => Some(proto::Plan::ZedPro),
773 _ => {
774 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
775 }
776 };
777 }
778
779 self.current_plan
780 }
781
782 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
783 self.subscription_period
784 }
785
786 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
787 self.trial_started_at
788 }
789
790 pub fn usage_based_billing_enabled(&self) -> Option<bool> {
791 self.is_usage_based_billing_enabled
792 }
793
794 pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
795 self.model_request_usage
796 }
797
798 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
799 self.edit_prediction_usage
800 }
801
802 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
803 self.current_user.clone()
804 }
805
806 /// Returns whether the user's account is too new to use the service.
807 pub fn account_too_young(&self) -> bool {
808 self.account_too_young.unwrap_or(false)
809 }
810
811 /// Returns whether the current user has overdue invoices and usage should be blocked.
812 pub fn has_overdue_invoices(&self) -> bool {
813 self.has_overdue_invoices.unwrap_or(false)
814 }
815
816 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
817 self.accepted_tos_at
818 .map(|accepted_tos_at| accepted_tos_at.is_some())
819 }
820
821 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
822 if self.current_user().is_none() {
823 return Task::ready(Err(anyhow!("no current user")));
824 };
825
826 let client = self.client.clone();
827 cx.spawn(async move |this, cx| -> anyhow::Result<()> {
828 let client = client.upgrade().context("client not found")?;
829 let response = client
830 .request(proto::AcceptTermsOfService {})
831 .await
832 .context("error accepting tos")?;
833 this.update(cx, |this, cx| {
834 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
835 cx.emit(Event::PrivateUserInfoUpdated);
836 })?;
837 Ok(())
838 })
839 }
840
841 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
842 self.accepted_tos_at = Some(
843 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
844 );
845 }
846
847 fn load_users(
848 &self,
849 request: impl RequestMessage<Response = UsersResponse>,
850 cx: &Context<Self>,
851 ) -> Task<Result<Vec<Arc<User>>>> {
852 let client = self.client.clone();
853 cx.spawn(async move |this, cx| {
854 if let Some(rpc) = client.upgrade() {
855 let response = rpc.request(request).await.context("error loading users")?;
856 let users = response.users;
857
858 this.update(cx, |this, _| this.insert(users))
859 } else {
860 Ok(Vec::new())
861 }
862 })
863 }
864
865 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
866 let mut ret = Vec::with_capacity(users.len());
867 for user in users {
868 let user = User::new(user);
869 if let Some(old) = self.users.insert(user.id, user.clone()) {
870 if old.github_login != user.github_login {
871 self.by_github_login.remove(&old.github_login);
872 }
873 }
874 self.by_github_login
875 .insert(user.github_login.clone(), user.id);
876 ret.push(user)
877 }
878 ret
879 }
880
881 pub fn set_participant_indices(
882 &mut self,
883 participant_indices: HashMap<u64, ParticipantIndex>,
884 cx: &mut Context<Self>,
885 ) {
886 if participant_indices != self.participant_indices {
887 self.participant_indices = participant_indices;
888 cx.emit(Event::ParticipantIndicesChanged);
889 }
890 }
891
892 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
893 &self.participant_indices
894 }
895
896 pub fn participant_names(
897 &self,
898 user_ids: impl Iterator<Item = u64>,
899 cx: &App,
900 ) -> HashMap<u64, SharedString> {
901 let mut ret = HashMap::default();
902 let mut missing_user_ids = Vec::new();
903 for id in user_ids {
904 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
905 ret.insert(id, github_login.into());
906 } else {
907 missing_user_ids.push(id)
908 }
909 }
910 if !missing_user_ids.is_empty() {
911 let this = self.weak_self.clone();
912 cx.spawn(async move |cx| {
913 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
914 .await
915 })
916 .detach_and_log_err(cx);
917 }
918 ret
919 }
920}
921
922impl User {
923 fn new(message: proto::User) -> Arc<Self> {
924 Arc::new(User {
925 id: message.id,
926 github_login: message.github_login,
927 avatar_uri: message.avatar_url.into(),
928 name: message.name,
929 })
930 }
931}
932
933impl Contact {
934 async fn from_proto(
935 contact: proto::Contact,
936 user_store: &Entity<UserStore>,
937 cx: &mut AsyncApp,
938 ) -> Result<Self> {
939 let user = user_store
940 .update(cx, |user_store, cx| {
941 user_store.get_user(contact.user_id, cx)
942 })?
943 .await?;
944 Ok(Self {
945 user,
946 online: contact.online,
947 busy: contact.busy,
948 })
949 }
950}
951
952impl Collaborator {
953 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
954 Ok(Self {
955 peer_id: message.peer_id.context("invalid peer id")?,
956 replica_id: message.replica_id as ReplicaId,
957 user_id: message.user_id as UserId,
958 is_host: message.is_host,
959 committer_name: message.committer_name,
960 committer_email: message.committer_email,
961 })
962 }
963}
964
965impl RequestUsage {
966 pub fn over_limit(&self) -> bool {
967 match self.limit {
968 UsageLimit::Limited(limit) => self.amount >= limit,
969 UsageLimit::Unlimited => false,
970 }
971 }
972
973 pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
974 let limit = match limit.variant? {
975 proto::usage_limit::Variant::Limited(limited) => {
976 UsageLimit::Limited(limited.limit as i32)
977 }
978 proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
979 };
980 Some(RequestUsage {
981 limit,
982 amount: amount as i32,
983 })
984 }
985
986 fn from_headers(
987 limit_name: &str,
988 amount_name: &str,
989 headers: &HeaderMap<HeaderValue>,
990 ) -> Result<Self> {
991 let limit = headers
992 .get(limit_name)
993 .with_context(|| format!("missing {limit_name:?} header"))?;
994 let limit = UsageLimit::from_str(limit.to_str()?)?;
995
996 let amount = headers
997 .get(amount_name)
998 .with_context(|| format!("missing {amount_name:?} header"))?;
999 let amount = amount.to_str()?.parse::<i32>()?;
1000
1001 Ok(Self { limit, amount })
1002 }
1003}
1004
1005impl ModelRequestUsage {
1006 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1007 Ok(Self(RequestUsage::from_headers(
1008 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
1009 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
1010 headers,
1011 )?))
1012 }
1013}
1014
1015impl EditPredictionUsage {
1016 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1017 Ok(Self(RequestUsage::from_headers(
1018 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1019 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1020 headers,
1021 )?))
1022 }
1023}