1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result, anyhow};
3use chrono::{DateTime, Utc};
4use cloud_llm_client::{
5 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
6 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
7};
8use collections::{HashMap, HashSet, hash_map::Entry};
9use derive_more::Deref;
10use feature_flags::FeatureFlagAppExt;
11use futures::{Future, StreamExt, channel::mpsc};
12use gpui::{
13 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
14};
15use http_client::http::{HeaderMap, HeaderValue};
16use postage::{sink::Sink, watch};
17use rpc::proto::{RequestMessage, UsersResponse};
18use std::{
19 str::FromStr as _,
20 sync::{Arc, Weak},
21};
22use text::ReplicaId;
23use util::TryFutureExt as _;
24
25pub type UserId = u64;
26
27#[derive(
28 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
29)]
30pub struct ChannelId(pub u64);
31
32impl std::fmt::Display for ChannelId {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 self.0.fmt(f)
35 }
36}
37
38#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
39pub struct ProjectId(pub u64);
40
41impl ProjectId {
42 pub fn to_proto(&self) -> u64 {
43 self.0
44 }
45}
46
47#[derive(
48 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
49)]
50pub struct DevServerProjectId(pub u64);
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub struct ParticipantIndex(pub u32);
54
55#[derive(Default, Debug)]
56pub struct User {
57 pub id: UserId,
58 pub github_login: SharedString,
59 pub avatar_uri: SharedUri,
60 pub name: Option<String>,
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
64pub struct Collaborator {
65 pub peer_id: proto::PeerId,
66 pub replica_id: ReplicaId,
67 pub user_id: UserId,
68 pub is_host: bool,
69 pub committer_name: Option<String>,
70 pub committer_email: Option<String>,
71}
72
73impl PartialOrd for User {
74 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
75 Some(self.cmp(other))
76 }
77}
78
79impl Ord for User {
80 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
81 self.github_login.cmp(&other.github_login)
82 }
83}
84
85impl PartialEq for User {
86 fn eq(&self, other: &Self) -> bool {
87 self.id == other.id && self.github_login == other.github_login
88 }
89}
90
91impl Eq for User {}
92
93#[derive(Debug, PartialEq)]
94pub struct Contact {
95 pub user: Arc<User>,
96 pub online: bool,
97 pub busy: bool,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ContactRequestStatus {
102 None,
103 RequestSent,
104 RequestReceived,
105 RequestAccepted,
106}
107
108pub struct UserStore {
109 users: HashMap<u64, Arc<User>>,
110 by_github_login: HashMap<SharedString, u64>,
111 participant_indices: HashMap<u64, ParticipantIndex>,
112 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
113 current_plan: Option<proto::Plan>,
114 trial_started_at: Option<DateTime<Utc>>,
115 is_usage_based_billing_enabled: Option<bool>,
116 account_too_young: Option<bool>,
117 current_user: watch::Receiver<Option<Arc<User>>>,
118 accepted_tos_at: Option<Option<DateTime<Utc>>>,
119 contacts: Vec<Arc<Contact>>,
120 incoming_contact_requests: Vec<Arc<User>>,
121 outgoing_contact_requests: Vec<Arc<User>>,
122 pending_contact_requests: HashMap<u64, usize>,
123 invite_info: Option<InviteInfo>,
124 client: Weak<Client>,
125 _maintain_contacts: Task<()>,
126 _maintain_current_user: Task<Result<()>>,
127 weak_self: WeakEntity<Self>,
128}
129
130#[derive(Clone)]
131pub struct InviteInfo {
132 pub count: u32,
133 pub url: Arc<str>,
134}
135
136pub enum Event {
137 Contact {
138 user: Arc<User>,
139 kind: ContactEventKind,
140 },
141 ShowContacts,
142 ParticipantIndicesChanged,
143 PrivateUserInfoUpdated,
144 PlanUpdated,
145}
146
147#[derive(Clone, Copy)]
148pub enum ContactEventKind {
149 Requested,
150 Accepted,
151 Cancelled,
152}
153
154impl EventEmitter<Event> for UserStore {}
155
156enum UpdateContacts {
157 Update(proto::UpdateContacts),
158 Wait(postage::barrier::Sender),
159 Clear(postage::barrier::Sender),
160}
161
162#[derive(Debug, Clone, Copy, Deref)]
163pub struct ModelRequestUsage(pub RequestUsage);
164
165#[derive(Debug, Clone, Copy, Deref)]
166pub struct EditPredictionUsage(pub RequestUsage);
167
168#[derive(Debug, Clone, Copy)]
169pub struct RequestUsage {
170 pub limit: UsageLimit,
171 pub amount: i32,
172}
173
174impl UserStore {
175 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
176 let (mut current_user_tx, current_user_rx) = watch::channel();
177 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
178 let rpc_subscriptions = vec![
179 client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
180 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
181 client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
182 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
183 ];
184 Self {
185 users: Default::default(),
186 by_github_login: Default::default(),
187 current_user: current_user_rx,
188 current_plan: None,
189 trial_started_at: None,
190 is_usage_based_billing_enabled: None,
191 account_too_young: None,
192 accepted_tos_at: None,
193 contacts: Default::default(),
194 incoming_contact_requests: Default::default(),
195 participant_indices: Default::default(),
196 outgoing_contact_requests: Default::default(),
197 invite_info: None,
198 client: Arc::downgrade(&client),
199 update_contacts_tx,
200 _maintain_contacts: cx.spawn(async move |this, cx| {
201 let _subscriptions = rpc_subscriptions;
202 while let Some(message) = update_contacts_rx.next().await {
203 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
204 {
205 task.log_err().await;
206 } else {
207 break;
208 }
209 }
210 }),
211 _maintain_current_user: cx.spawn(async move |this, cx| {
212 let mut status = client.status();
213 let weak = Arc::downgrade(&client);
214 drop(client);
215 while let Some(status) = status.next().await {
216 // if the client is dropped, the app is shutting down.
217 let Some(client) = weak.upgrade() else {
218 return Ok(());
219 };
220 match status {
221 Status::Connected { .. } => {
222 if let Some(user_id) = client.user_id() {
223 let fetch_user = if let Ok(fetch_user) =
224 this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
225 {
226 fetch_user
227 } else {
228 break;
229 };
230 let fetch_private_user_info =
231 client.request(proto::GetPrivateUserInfo {}).log_err();
232 let (user, info) =
233 futures::join!(fetch_user, fetch_private_user_info);
234
235 cx.update(|cx| {
236 if let Some(info) = info {
237 let staff =
238 info.staff && !*feature_flags::ZED_DISABLE_STAFF;
239 cx.update_flags(staff, info.flags);
240 client.telemetry.set_authenticated_user_info(
241 Some(info.metrics_id.clone()),
242 staff,
243 );
244
245 this.update(cx, |this, cx| {
246 let accepted_tos_at = {
247 #[cfg(debug_assertions)]
248 if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
249 {
250 None
251 } else {
252 info.accepted_tos_at
253 }
254
255 #[cfg(not(debug_assertions))]
256 info.accepted_tos_at
257 };
258
259 this.set_current_user_accepted_tos_at(accepted_tos_at);
260 cx.emit(Event::PrivateUserInfoUpdated);
261 })
262 } else {
263 anyhow::Ok(())
264 }
265 })??;
266
267 current_user_tx.send(user).await.ok();
268
269 this.update(cx, |_, cx| cx.notify())?;
270 }
271 }
272 Status::SignedOut => {
273 current_user_tx.send(None).await.ok();
274 this.update(cx, |this, cx| {
275 this.accepted_tos_at = None;
276 cx.emit(Event::PrivateUserInfoUpdated);
277 cx.notify();
278 this.clear_contacts()
279 })?
280 .await;
281 }
282 Status::ConnectionLost => {
283 this.update(cx, |this, cx| {
284 cx.notify();
285 this.clear_contacts()
286 })?
287 .await;
288 }
289 _ => {}
290 }
291 }
292 Ok(())
293 }),
294 pending_contact_requests: Default::default(),
295 weak_self: cx.weak_entity(),
296 }
297 }
298
299 #[cfg(feature = "test-support")]
300 pub fn clear_cache(&mut self) {
301 self.users.clear();
302 self.by_github_login.clear();
303 }
304
305 async fn handle_update_invite_info(
306 this: Entity<Self>,
307 message: TypedEnvelope<proto::UpdateInviteInfo>,
308 mut cx: AsyncApp,
309 ) -> Result<()> {
310 this.update(&mut cx, |this, cx| {
311 this.invite_info = Some(InviteInfo {
312 url: Arc::from(message.payload.url),
313 count: message.payload.count,
314 });
315 cx.notify();
316 })?;
317 Ok(())
318 }
319
320 async fn handle_show_contacts(
321 this: Entity<Self>,
322 _: TypedEnvelope<proto::ShowContacts>,
323 mut cx: AsyncApp,
324 ) -> Result<()> {
325 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
326 Ok(())
327 }
328
329 pub fn invite_info(&self) -> Option<&InviteInfo> {
330 self.invite_info.as_ref()
331 }
332
333 async fn handle_update_contacts(
334 this: Entity<Self>,
335 message: TypedEnvelope<proto::UpdateContacts>,
336 mut cx: AsyncApp,
337 ) -> Result<()> {
338 this.read_with(&mut cx, |this, _| {
339 this.update_contacts_tx
340 .unbounded_send(UpdateContacts::Update(message.payload))
341 .unwrap();
342 })?;
343 Ok(())
344 }
345
346 async fn handle_update_plan(
347 this: Entity<Self>,
348 message: TypedEnvelope<proto::UpdateUserPlan>,
349 mut cx: AsyncApp,
350 ) -> Result<()> {
351 this.update(&mut cx, |this, cx| {
352 this.current_plan = Some(message.payload.plan());
353 this.trial_started_at = message
354 .payload
355 .trial_started_at
356 .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
357 this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
358 this.account_too_young = message.payload.account_too_young;
359
360 cx.emit(Event::PlanUpdated);
361 cx.notify();
362 })?;
363 Ok(())
364 }
365
366 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
367 match message {
368 UpdateContacts::Wait(barrier) => {
369 drop(barrier);
370 Task::ready(Ok(()))
371 }
372 UpdateContacts::Clear(barrier) => {
373 self.contacts.clear();
374 self.incoming_contact_requests.clear();
375 self.outgoing_contact_requests.clear();
376 drop(barrier);
377 Task::ready(Ok(()))
378 }
379 UpdateContacts::Update(message) => {
380 let mut user_ids = HashSet::default();
381 for contact in &message.contacts {
382 user_ids.insert(contact.user_id);
383 }
384 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
385 user_ids.extend(message.outgoing_requests.iter());
386
387 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
388 cx.spawn(async move |this, cx| {
389 load_users.await?;
390
391 // Users are fetched in parallel above and cached in call to get_users
392 // No need to parallelize here
393 let mut updated_contacts = Vec::new();
394 let this = this.upgrade().context("can't upgrade user store handle")?;
395 for contact in message.contacts {
396 updated_contacts
397 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
398 }
399
400 let mut incoming_requests = Vec::new();
401 for request in message.incoming_requests {
402 incoming_requests.push({
403 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
404 .await?
405 });
406 }
407
408 let mut outgoing_requests = Vec::new();
409 for requested_user_id in message.outgoing_requests {
410 outgoing_requests.push(
411 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
412 .await?,
413 );
414 }
415
416 let removed_contacts =
417 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
418 let removed_incoming_requests =
419 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
420 let removed_outgoing_requests =
421 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
422
423 this.update(cx, |this, cx| {
424 // Remove contacts
425 this.contacts
426 .retain(|contact| !removed_contacts.contains(&contact.user.id));
427 // Update existing contacts and insert new ones
428 for updated_contact in updated_contacts {
429 match this.contacts.binary_search_by_key(
430 &&updated_contact.user.github_login,
431 |contact| &contact.user.github_login,
432 ) {
433 Ok(ix) => this.contacts[ix] = updated_contact,
434 Err(ix) => this.contacts.insert(ix, updated_contact),
435 }
436 }
437
438 // Remove incoming contact requests
439 this.incoming_contact_requests.retain(|user| {
440 if removed_incoming_requests.contains(&user.id) {
441 cx.emit(Event::Contact {
442 user: user.clone(),
443 kind: ContactEventKind::Cancelled,
444 });
445 false
446 } else {
447 true
448 }
449 });
450 // Update existing incoming requests and insert new ones
451 for user in incoming_requests {
452 match this
453 .incoming_contact_requests
454 .binary_search_by_key(&&user.github_login, |contact| {
455 &contact.github_login
456 }) {
457 Ok(ix) => this.incoming_contact_requests[ix] = user,
458 Err(ix) => this.incoming_contact_requests.insert(ix, user),
459 }
460 }
461
462 // Remove outgoing contact requests
463 this.outgoing_contact_requests
464 .retain(|user| !removed_outgoing_requests.contains(&user.id));
465 // Update existing incoming requests and insert new ones
466 for request in outgoing_requests {
467 match this
468 .outgoing_contact_requests
469 .binary_search_by_key(&&request.github_login, |contact| {
470 &contact.github_login
471 }) {
472 Ok(ix) => this.outgoing_contact_requests[ix] = request,
473 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
474 }
475 }
476
477 cx.notify();
478 })?;
479
480 Ok(())
481 })
482 }
483 }
484 }
485
486 pub fn contacts(&self) -> &[Arc<Contact>] {
487 &self.contacts
488 }
489
490 pub fn has_contact(&self, user: &Arc<User>) -> bool {
491 self.contacts
492 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
493 .is_ok()
494 }
495
496 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
497 &self.incoming_contact_requests
498 }
499
500 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
501 &self.outgoing_contact_requests
502 }
503
504 pub fn is_contact_request_pending(&self, user: &User) -> bool {
505 self.pending_contact_requests.contains_key(&user.id)
506 }
507
508 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
509 if self
510 .contacts
511 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
512 .is_ok()
513 {
514 ContactRequestStatus::RequestAccepted
515 } else if self
516 .outgoing_contact_requests
517 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
518 .is_ok()
519 {
520 ContactRequestStatus::RequestSent
521 } else if self
522 .incoming_contact_requests
523 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
524 .is_ok()
525 {
526 ContactRequestStatus::RequestReceived
527 } else {
528 ContactRequestStatus::None
529 }
530 }
531
532 pub fn request_contact(
533 &mut self,
534 responder_id: u64,
535 cx: &mut Context<Self>,
536 ) -> Task<Result<()>> {
537 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
538 }
539
540 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
541 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
542 }
543
544 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
545 self.incoming_contact_requests
546 .iter()
547 .any(|user| user.id == user_id)
548 }
549
550 pub fn respond_to_contact_request(
551 &mut self,
552 requester_id: u64,
553 accept: bool,
554 cx: &mut Context<Self>,
555 ) -> Task<Result<()>> {
556 self.perform_contact_request(
557 requester_id,
558 proto::RespondToContactRequest {
559 requester_id,
560 response: if accept {
561 proto::ContactRequestResponse::Accept
562 } else {
563 proto::ContactRequestResponse::Decline
564 } as i32,
565 },
566 cx,
567 )
568 }
569
570 pub fn dismiss_contact_request(
571 &self,
572 requester_id: u64,
573 cx: &Context<Self>,
574 ) -> Task<Result<()>> {
575 let client = self.client.upgrade();
576 cx.spawn(async move |_, _| {
577 client
578 .context("can't upgrade client reference")?
579 .request(proto::RespondToContactRequest {
580 requester_id,
581 response: proto::ContactRequestResponse::Dismiss as i32,
582 })
583 .await?;
584 Ok(())
585 })
586 }
587
588 fn perform_contact_request<T: RequestMessage>(
589 &mut self,
590 user_id: u64,
591 request: T,
592 cx: &mut Context<Self>,
593 ) -> Task<Result<()>> {
594 let client = self.client.upgrade();
595 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
596 cx.notify();
597
598 cx.spawn(async move |this, cx| {
599 let response = client
600 .context("can't upgrade client reference")?
601 .request(request)
602 .await;
603 this.update(cx, |this, cx| {
604 if let Entry::Occupied(mut request_count) =
605 this.pending_contact_requests.entry(user_id)
606 {
607 *request_count.get_mut() -= 1;
608 if *request_count.get() == 0 {
609 request_count.remove();
610 }
611 }
612 cx.notify();
613 })?;
614 response?;
615 Ok(())
616 })
617 }
618
619 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
620 let (tx, mut rx) = postage::barrier::channel();
621 self.update_contacts_tx
622 .unbounded_send(UpdateContacts::Clear(tx))
623 .unwrap();
624 async move {
625 rx.next().await;
626 }
627 }
628
629 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
630 let (tx, mut rx) = postage::barrier::channel();
631 self.update_contacts_tx
632 .unbounded_send(UpdateContacts::Wait(tx))
633 .unwrap();
634 async move {
635 rx.next().await;
636 }
637 }
638
639 pub fn get_users(
640 &self,
641 user_ids: Vec<u64>,
642 cx: &Context<Self>,
643 ) -> Task<Result<Vec<Arc<User>>>> {
644 let mut user_ids_to_fetch = user_ids.clone();
645 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
646
647 cx.spawn(async move |this, cx| {
648 if !user_ids_to_fetch.is_empty() {
649 this.update(cx, |this, cx| {
650 this.load_users(
651 proto::GetUsers {
652 user_ids: user_ids_to_fetch,
653 },
654 cx,
655 )
656 })?
657 .await?;
658 }
659
660 this.read_with(cx, |this, _| {
661 user_ids
662 .iter()
663 .map(|user_id| {
664 this.users
665 .get(user_id)
666 .cloned()
667 .with_context(|| format!("user {user_id} not found"))
668 })
669 .collect()
670 })?
671 })
672 }
673
674 pub fn fuzzy_search_users(
675 &self,
676 query: String,
677 cx: &Context<Self>,
678 ) -> Task<Result<Vec<Arc<User>>>> {
679 self.load_users(proto::FuzzySearchUsers { query }, cx)
680 }
681
682 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
683 self.users.get(&user_id).cloned()
684 }
685
686 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
687 if let Some(user) = self.users.get(&user_id).cloned() {
688 return Some(user);
689 }
690
691 self.get_user(user_id, cx).detach_and_log_err(cx);
692 None
693 }
694
695 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
696 if let Some(user) = self.users.get(&user_id).cloned() {
697 return Task::ready(Ok(user));
698 }
699
700 let load_users = self.get_users(vec![user_id], cx);
701 cx.spawn(async move |this, cx| {
702 load_users.await?;
703 this.read_with(cx, |this, _| {
704 this.users
705 .get(&user_id)
706 .cloned()
707 .context("server responded with no users")
708 })?
709 })
710 }
711
712 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
713 self.by_github_login
714 .get(github_login)
715 .and_then(|id| self.users.get(id).cloned())
716 }
717
718 pub fn current_user(&self) -> Option<Arc<User>> {
719 self.current_user.borrow().clone()
720 }
721
722 pub fn current_plan(&self) -> Option<proto::Plan> {
723 #[cfg(debug_assertions)]
724 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
725 return match plan.as_str() {
726 "free" => Some(proto::Plan::Free),
727 "trial" => Some(proto::Plan::ZedProTrial),
728 "pro" => Some(proto::Plan::ZedPro),
729 _ => {
730 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
731 }
732 };
733 }
734
735 self.current_plan
736 }
737
738 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
739 self.trial_started_at
740 }
741
742 pub fn usage_based_billing_enabled(&self) -> Option<bool> {
743 self.is_usage_based_billing_enabled
744 }
745
746 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
747 self.current_user.clone()
748 }
749
750 /// Returns whether the user's account is too new to use the service.
751 pub fn account_too_young(&self) -> bool {
752 self.account_too_young.unwrap_or(false)
753 }
754
755 pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
756 self.accepted_tos_at
757 .map(|accepted_tos_at| accepted_tos_at.is_some())
758 }
759
760 pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
761 if self.current_user().is_none() {
762 return Task::ready(Err(anyhow!("no current user")));
763 };
764
765 let client = self.client.clone();
766 cx.spawn(async move |this, cx| -> anyhow::Result<()> {
767 let client = client.upgrade().context("client not found")?;
768 let response = client
769 .request(proto::AcceptTermsOfService {})
770 .await
771 .context("error accepting tos")?;
772 this.update(cx, |this, cx| {
773 this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
774 cx.emit(Event::PrivateUserInfoUpdated);
775 })?;
776 Ok(())
777 })
778 }
779
780 fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
781 self.accepted_tos_at = Some(
782 accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
783 );
784 }
785
786 fn load_users(
787 &self,
788 request: impl RequestMessage<Response = UsersResponse>,
789 cx: &Context<Self>,
790 ) -> Task<Result<Vec<Arc<User>>>> {
791 let client = self.client.clone();
792 cx.spawn(async move |this, cx| {
793 if let Some(rpc) = client.upgrade() {
794 let response = rpc.request(request).await.context("error loading users")?;
795 let users = response.users;
796
797 this.update(cx, |this, _| this.insert(users))
798 } else {
799 Ok(Vec::new())
800 }
801 })
802 }
803
804 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
805 let mut ret = Vec::with_capacity(users.len());
806 for user in users {
807 let user = User::new(user);
808 if let Some(old) = self.users.insert(user.id, user.clone()) {
809 if old.github_login != user.github_login {
810 self.by_github_login.remove(&old.github_login);
811 }
812 }
813 self.by_github_login
814 .insert(user.github_login.clone(), user.id);
815 ret.push(user)
816 }
817 ret
818 }
819
820 pub fn set_participant_indices(
821 &mut self,
822 participant_indices: HashMap<u64, ParticipantIndex>,
823 cx: &mut Context<Self>,
824 ) {
825 if participant_indices != self.participant_indices {
826 self.participant_indices = participant_indices;
827 cx.emit(Event::ParticipantIndicesChanged);
828 }
829 }
830
831 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
832 &self.participant_indices
833 }
834
835 pub fn participant_names(
836 &self,
837 user_ids: impl Iterator<Item = u64>,
838 cx: &App,
839 ) -> HashMap<u64, SharedString> {
840 let mut ret = HashMap::default();
841 let mut missing_user_ids = Vec::new();
842 for id in user_ids {
843 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
844 ret.insert(id, github_login);
845 } else {
846 missing_user_ids.push(id)
847 }
848 }
849 if !missing_user_ids.is_empty() {
850 let this = self.weak_self.clone();
851 cx.spawn(async move |cx| {
852 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
853 .await
854 })
855 .detach_and_log_err(cx);
856 }
857 ret
858 }
859}
860
861impl User {
862 fn new(message: proto::User) -> Arc<Self> {
863 Arc::new(User {
864 id: message.id,
865 github_login: message.github_login.into(),
866 avatar_uri: message.avatar_url.into(),
867 name: message.name,
868 })
869 }
870}
871
872impl Contact {
873 async fn from_proto(
874 contact: proto::Contact,
875 user_store: &Entity<UserStore>,
876 cx: &mut AsyncApp,
877 ) -> Result<Self> {
878 let user = user_store
879 .update(cx, |user_store, cx| {
880 user_store.get_user(contact.user_id, cx)
881 })?
882 .await?;
883 Ok(Self {
884 user,
885 online: contact.online,
886 busy: contact.busy,
887 })
888 }
889}
890
891impl Collaborator {
892 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
893 Ok(Self {
894 peer_id: message.peer_id.context("invalid peer id")?,
895 replica_id: message.replica_id as ReplicaId,
896 user_id: message.user_id as UserId,
897 is_host: message.is_host,
898 committer_name: message.committer_name,
899 committer_email: message.committer_email,
900 })
901 }
902}
903
904impl RequestUsage {
905 pub fn over_limit(&self) -> bool {
906 match self.limit {
907 UsageLimit::Limited(limit) => self.amount >= limit,
908 UsageLimit::Unlimited => false,
909 }
910 }
911
912 pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
913 let limit = match limit.variant? {
914 proto::usage_limit::Variant::Limited(limited) => {
915 UsageLimit::Limited(limited.limit as i32)
916 }
917 proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
918 };
919 Some(RequestUsage {
920 limit,
921 amount: amount as i32,
922 })
923 }
924
925 fn from_headers(
926 limit_name: &str,
927 amount_name: &str,
928 headers: &HeaderMap<HeaderValue>,
929 ) -> Result<Self> {
930 let limit = headers
931 .get(limit_name)
932 .with_context(|| format!("missing {limit_name:?} header"))?;
933 let limit = UsageLimit::from_str(limit.to_str()?)?;
934
935 let amount = headers
936 .get(amount_name)
937 .with_context(|| format!("missing {amount_name:?} header"))?;
938 let amount = amount.to_str()?.parse::<i32>()?;
939
940 Ok(Self { limit, amount })
941 }
942}
943
944impl ModelRequestUsage {
945 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
946 Ok(Self(RequestUsage::from_headers(
947 MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
948 MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
949 headers,
950 )?))
951 }
952}
953
954impl EditPredictionUsage {
955 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
956 Ok(Self(RequestUsage::from_headers(
957 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
958 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
959 headers,
960 )?))
961 }
962}