1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
6use cloud_llm_client::{
7 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, Plan,
8 UsageLimit,
9};
10use collections::{HashMap, HashSet, hash_map::Entry};
11use derive_more::Deref;
12use feature_flags::FeatureFlagAppExt;
13use futures::{Future, StreamExt, channel::mpsc};
14use gpui::{
15 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
16};
17use http_client::http::{HeaderMap, HeaderValue};
18use postage::{sink::Sink, watch};
19use rpc::proto::{RequestMessage, UsersResponse};
20use std::{
21 str::FromStr as _,
22 sync::{Arc, Weak},
23};
24use text::ReplicaId;
25use util::{ResultExt, TryFutureExt as _};
26
27pub type UserId = u64;
28
29#[derive(
30 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
31)]
32pub struct ChannelId(pub u64);
33
34impl std::fmt::Display for ChannelId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 self.0.fmt(f)
37 }
38}
39
40#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
41pub struct ProjectId(pub u64);
42
43impl ProjectId {
44 pub fn to_proto(self) -> u64 {
45 self.0
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub struct ParticipantIndex(pub u32);
51
52#[derive(Default, Debug)]
53pub struct User {
54 pub id: UserId,
55 pub github_login: SharedString,
56 pub avatar_uri: SharedUri,
57 pub name: Option<String>,
58}
59
60#[derive(Clone, Debug, PartialEq, Eq)]
61pub struct Collaborator {
62 pub peer_id: proto::PeerId,
63 pub replica_id: ReplicaId,
64 pub user_id: UserId,
65 pub is_host: bool,
66 pub committer_name: Option<String>,
67 pub committer_email: Option<String>,
68}
69
70impl PartialOrd for User {
71 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
72 Some(self.cmp(other))
73 }
74}
75
76impl Ord for User {
77 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
78 self.github_login.cmp(&other.github_login)
79 }
80}
81
82impl PartialEq for User {
83 fn eq(&self, other: &Self) -> bool {
84 self.id == other.id && self.github_login == other.github_login
85 }
86}
87
88impl Eq for User {}
89
90#[derive(Debug, PartialEq)]
91pub struct Contact {
92 pub user: Arc<User>,
93 pub online: bool,
94 pub busy: bool,
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum ContactRequestStatus {
99 None,
100 RequestSent,
101 RequestReceived,
102 RequestAccepted,
103}
104
105pub struct UserStore {
106 users: HashMap<u64, Arc<User>>,
107 by_github_login: HashMap<SharedString, u64>,
108 participant_indices: HashMap<u64, ParticipantIndex>,
109 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
110 edit_prediction_usage: Option<EditPredictionUsage>,
111 plan_info: Option<PlanInfo>,
112 current_user: watch::Receiver<Option<Arc<User>>>,
113 contacts: Vec<Arc<Contact>>,
114 incoming_contact_requests: Vec<Arc<User>>,
115 outgoing_contact_requests: Vec<Arc<User>>,
116 pending_contact_requests: HashMap<u64, usize>,
117 client: Weak<Client>,
118 _maintain_contacts: Task<()>,
119 _maintain_current_user: Task<Result<()>>,
120 weak_self: WeakEntity<Self>,
121}
122
123#[derive(Clone)]
124pub struct InviteInfo {
125 pub count: u32,
126 pub url: Arc<str>,
127}
128
129pub enum Event {
130 Contact {
131 user: Arc<User>,
132 kind: ContactEventKind,
133 },
134 ShowContacts,
135 ParticipantIndicesChanged,
136 PrivateUserInfoUpdated,
137 PlanUpdated,
138}
139
140#[derive(Clone, Copy)]
141pub enum ContactEventKind {
142 Requested,
143 Accepted,
144 Cancelled,
145}
146
147impl EventEmitter<Event> for UserStore {}
148
149enum UpdateContacts {
150 Update(proto::UpdateContacts),
151 Wait(postage::barrier::Sender),
152 Clear(postage::barrier::Sender),
153}
154
155#[derive(Debug, Clone, Copy, Deref)]
156pub struct EditPredictionUsage(pub RequestUsage);
157
158#[derive(Debug, Clone, Copy)]
159pub struct RequestUsage {
160 pub limit: UsageLimit,
161 pub amount: i32,
162}
163
164impl UserStore {
165 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
166 let (mut current_user_tx, current_user_rx) = watch::channel();
167 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
168 let rpc_subscriptions = vec![
169 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
170 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
171 ];
172
173 client.add_message_to_client_handler({
174 let this = cx.weak_entity();
175 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
176 });
177
178 Self {
179 users: Default::default(),
180 by_github_login: Default::default(),
181 current_user: current_user_rx,
182 plan_info: None,
183 edit_prediction_usage: None,
184 contacts: Default::default(),
185 incoming_contact_requests: Default::default(),
186 participant_indices: Default::default(),
187 outgoing_contact_requests: Default::default(),
188 client: Arc::downgrade(&client),
189 update_contacts_tx,
190 _maintain_contacts: cx.spawn(async move |this, cx| {
191 let _subscriptions = rpc_subscriptions;
192 while let Some(message) = update_contacts_rx.next().await {
193 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
194 {
195 task.log_err().await;
196 } else {
197 break;
198 }
199 }
200 }),
201 _maintain_current_user: cx.spawn(async move |this, cx| {
202 let mut status = client.status();
203 let weak = Arc::downgrade(&client);
204 drop(client);
205 while let Some(status) = status.next().await {
206 // if the client is dropped, the app is shutting down.
207 let Some(client) = weak.upgrade() else {
208 return Ok(());
209 };
210 match status {
211 Status::Authenticated
212 | Status::Reauthenticated
213 | Status::Connected { .. } => {
214 if let Some(user_id) = client.user_id() {
215 let response = client
216 .cloud_client()
217 .get_authenticated_user()
218 .await
219 .log_err();
220
221 let current_user_and_response = if let Some(response) = response {
222 let user = Arc::new(User {
223 id: user_id,
224 github_login: response.user.github_login.clone().into(),
225 avatar_uri: response.user.avatar_url.clone().into(),
226 name: response.user.name.clone(),
227 });
228
229 Some((user, response))
230 } else {
231 None
232 };
233 current_user_tx
234 .send(
235 current_user_and_response
236 .as_ref()
237 .map(|(user, _)| user.clone()),
238 )
239 .await
240 .ok();
241
242 cx.update(|cx| {
243 if let Some((user, response)) = current_user_and_response {
244 this.update(cx, |this, cx| {
245 this.by_github_login
246 .insert(user.github_login.clone(), user_id);
247 this.users.insert(user_id, user);
248 this.update_authenticated_user(response, cx)
249 })
250 } else {
251 anyhow::Ok(())
252 }
253 })?;
254
255 this.update(cx, |_, cx| cx.notify())?;
256 }
257 }
258 Status::SignedOut => {
259 current_user_tx.send(None).await.ok();
260 this.update(cx, |this, cx| {
261 this.clear_plan_and_usage();
262 cx.emit(Event::PrivateUserInfoUpdated);
263 cx.notify();
264 this.clear_contacts()
265 })?
266 .await;
267 }
268 Status::ConnectionLost => {
269 this.update(cx, |this, cx| {
270 cx.notify();
271 this.clear_contacts()
272 })?
273 .await;
274 }
275 _ => {}
276 }
277 }
278 Ok(())
279 }),
280 pending_contact_requests: Default::default(),
281 weak_self: cx.weak_entity(),
282 }
283 }
284
285 #[cfg(feature = "test-support")]
286 pub fn clear_cache(&mut self) {
287 self.users.clear();
288 self.by_github_login.clear();
289 }
290
291 async fn handle_show_contacts(
292 this: Entity<Self>,
293 _: TypedEnvelope<proto::ShowContacts>,
294 mut cx: AsyncApp,
295 ) -> Result<()> {
296 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
297 Ok(())
298 }
299
300 async fn handle_update_contacts(
301 this: Entity<Self>,
302 message: TypedEnvelope<proto::UpdateContacts>,
303 cx: AsyncApp,
304 ) -> Result<()> {
305 this.read_with(&cx, |this, _| {
306 this.update_contacts_tx
307 .unbounded_send(UpdateContacts::Update(message.payload))
308 .unwrap();
309 });
310 Ok(())
311 }
312
313 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
314 match message {
315 UpdateContacts::Wait(barrier) => {
316 drop(barrier);
317 Task::ready(Ok(()))
318 }
319 UpdateContacts::Clear(barrier) => {
320 self.contacts.clear();
321 self.incoming_contact_requests.clear();
322 self.outgoing_contact_requests.clear();
323 drop(barrier);
324 Task::ready(Ok(()))
325 }
326 UpdateContacts::Update(message) => {
327 let mut user_ids = HashSet::default();
328 for contact in &message.contacts {
329 user_ids.insert(contact.user_id);
330 }
331 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
332 user_ids.extend(message.outgoing_requests.iter());
333
334 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
335 cx.spawn(async move |this, cx| {
336 load_users.await?;
337
338 // Users are fetched in parallel above and cached in call to get_users
339 // No need to parallelize here
340 let mut updated_contacts = Vec::new();
341 let this = this.upgrade().context("can't upgrade user store handle")?;
342 for contact in message.contacts {
343 updated_contacts
344 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
345 }
346
347 let mut incoming_requests = Vec::new();
348 for request in message.incoming_requests {
349 incoming_requests.push({
350 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
351 .await?
352 });
353 }
354
355 let mut outgoing_requests = Vec::new();
356 for requested_user_id in message.outgoing_requests {
357 outgoing_requests.push(
358 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
359 .await?,
360 );
361 }
362
363 let removed_contacts =
364 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
365 let removed_incoming_requests =
366 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
367 let removed_outgoing_requests =
368 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
369
370 this.update(cx, |this, cx| {
371 // Remove contacts
372 this.contacts
373 .retain(|contact| !removed_contacts.contains(&contact.user.id));
374 // Update existing contacts and insert new ones
375 for updated_contact in updated_contacts {
376 match this.contacts.binary_search_by_key(
377 &&updated_contact.user.github_login,
378 |contact| &contact.user.github_login,
379 ) {
380 Ok(ix) => this.contacts[ix] = updated_contact,
381 Err(ix) => this.contacts.insert(ix, updated_contact),
382 }
383 }
384
385 // Remove incoming contact requests
386 this.incoming_contact_requests.retain(|user| {
387 if removed_incoming_requests.contains(&user.id) {
388 cx.emit(Event::Contact {
389 user: user.clone(),
390 kind: ContactEventKind::Cancelled,
391 });
392 false
393 } else {
394 true
395 }
396 });
397 // Update existing incoming requests and insert new ones
398 for user in incoming_requests {
399 match this
400 .incoming_contact_requests
401 .binary_search_by_key(&&user.github_login, |contact| {
402 &contact.github_login
403 }) {
404 Ok(ix) => this.incoming_contact_requests[ix] = user,
405 Err(ix) => this.incoming_contact_requests.insert(ix, user),
406 }
407 }
408
409 // Remove outgoing contact requests
410 this.outgoing_contact_requests
411 .retain(|user| !removed_outgoing_requests.contains(&user.id));
412 // Update existing incoming requests and insert new ones
413 for request in outgoing_requests {
414 match this
415 .outgoing_contact_requests
416 .binary_search_by_key(&&request.github_login, |contact| {
417 &contact.github_login
418 }) {
419 Ok(ix) => this.outgoing_contact_requests[ix] = request,
420 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
421 }
422 }
423
424 cx.notify();
425 });
426
427 Ok(())
428 })
429 }
430 }
431 }
432
433 pub fn contacts(&self) -> &[Arc<Contact>] {
434 &self.contacts
435 }
436
437 pub fn has_contact(&self, user: &Arc<User>) -> bool {
438 self.contacts
439 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
440 .is_ok()
441 }
442
443 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
444 &self.incoming_contact_requests
445 }
446
447 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
448 &self.outgoing_contact_requests
449 }
450
451 pub fn is_contact_request_pending(&self, user: &User) -> bool {
452 self.pending_contact_requests.contains_key(&user.id)
453 }
454
455 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
456 if self
457 .contacts
458 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
459 .is_ok()
460 {
461 ContactRequestStatus::RequestAccepted
462 } else if self
463 .outgoing_contact_requests
464 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
465 .is_ok()
466 {
467 ContactRequestStatus::RequestSent
468 } else if self
469 .incoming_contact_requests
470 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
471 .is_ok()
472 {
473 ContactRequestStatus::RequestReceived
474 } else {
475 ContactRequestStatus::None
476 }
477 }
478
479 pub fn request_contact(
480 &mut self,
481 responder_id: u64,
482 cx: &mut Context<Self>,
483 ) -> Task<Result<()>> {
484 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
485 }
486
487 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
488 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
489 }
490
491 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
492 self.incoming_contact_requests
493 .iter()
494 .any(|user| user.id == user_id)
495 }
496
497 pub fn respond_to_contact_request(
498 &mut self,
499 requester_id: u64,
500 accept: bool,
501 cx: &mut Context<Self>,
502 ) -> Task<Result<()>> {
503 self.perform_contact_request(
504 requester_id,
505 proto::RespondToContactRequest {
506 requester_id,
507 response: if accept {
508 proto::ContactRequestResponse::Accept
509 } else {
510 proto::ContactRequestResponse::Decline
511 } as i32,
512 },
513 cx,
514 )
515 }
516
517 pub fn dismiss_contact_request(
518 &self,
519 requester_id: u64,
520 cx: &Context<Self>,
521 ) -> Task<Result<()>> {
522 let client = self.client.upgrade();
523 cx.spawn(async move |_, _| {
524 client
525 .context("can't upgrade client reference")?
526 .request(proto::RespondToContactRequest {
527 requester_id,
528 response: proto::ContactRequestResponse::Dismiss as i32,
529 })
530 .await?;
531 Ok(())
532 })
533 }
534
535 fn perform_contact_request<T: RequestMessage>(
536 &mut self,
537 user_id: u64,
538 request: T,
539 cx: &mut Context<Self>,
540 ) -> Task<Result<()>> {
541 let client = self.client.upgrade();
542 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
543 cx.notify();
544
545 cx.spawn(async move |this, cx| {
546 let response = client
547 .context("can't upgrade client reference")?
548 .request(request)
549 .await;
550 this.update(cx, |this, cx| {
551 if let Entry::Occupied(mut request_count) =
552 this.pending_contact_requests.entry(user_id)
553 {
554 *request_count.get_mut() -= 1;
555 if *request_count.get() == 0 {
556 request_count.remove();
557 }
558 }
559 cx.notify();
560 })?;
561 response?;
562 Ok(())
563 })
564 }
565
566 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
567 let (tx, mut rx) = postage::barrier::channel();
568 self.update_contacts_tx
569 .unbounded_send(UpdateContacts::Clear(tx))
570 .unwrap();
571 async move {
572 rx.next().await;
573 }
574 }
575
576 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
577 let (tx, mut rx) = postage::barrier::channel();
578 self.update_contacts_tx
579 .unbounded_send(UpdateContacts::Wait(tx))
580 .unwrap();
581 async move {
582 rx.next().await;
583 }
584 }
585
586 pub fn get_users(
587 &self,
588 user_ids: Vec<u64>,
589 cx: &Context<Self>,
590 ) -> Task<Result<Vec<Arc<User>>>> {
591 let mut user_ids_to_fetch = user_ids.clone();
592 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
593
594 cx.spawn(async move |this, cx| {
595 if !user_ids_to_fetch.is_empty() {
596 this.update(cx, |this, cx| {
597 this.load_users(
598 proto::GetUsers {
599 user_ids: user_ids_to_fetch,
600 },
601 cx,
602 )
603 })?
604 .await?;
605 }
606
607 this.read_with(cx, |this, _| {
608 user_ids
609 .iter()
610 .map(|user_id| {
611 this.users
612 .get(user_id)
613 .cloned()
614 .with_context(|| format!("user {user_id} not found"))
615 })
616 .collect()
617 })?
618 })
619 }
620
621 pub fn fuzzy_search_users(
622 &self,
623 query: String,
624 cx: &Context<Self>,
625 ) -> Task<Result<Vec<Arc<User>>>> {
626 self.load_users(proto::FuzzySearchUsers { query }, cx)
627 }
628
629 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
630 self.users.get(&user_id).cloned()
631 }
632
633 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
634 if let Some(user) = self.users.get(&user_id).cloned() {
635 return Some(user);
636 }
637
638 self.get_user(user_id, cx).detach_and_log_err(cx);
639 None
640 }
641
642 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
643 if let Some(user) = self.users.get(&user_id).cloned() {
644 return Task::ready(Ok(user));
645 }
646
647 let load_users = self.get_users(vec![user_id], cx);
648 cx.spawn(async move |this, cx| {
649 load_users.await?;
650 this.read_with(cx, |this, _| {
651 this.users
652 .get(&user_id)
653 .cloned()
654 .context("server responded with no users")
655 })?
656 })
657 }
658
659 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
660 self.by_github_login
661 .get(github_login)
662 .and_then(|id| self.users.get(id).cloned())
663 }
664
665 pub fn current_user(&self) -> Option<Arc<User>> {
666 self.current_user.borrow().clone()
667 }
668
669 pub fn plan(&self) -> Option<Plan> {
670 #[cfg(debug_assertions)]
671 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
672 use cloud_llm_client::PlanV2;
673
674 return match plan.as_str() {
675 "free" => Some(Plan::V2(PlanV2::ZedFree)),
676 "trial" => Some(Plan::V2(PlanV2::ZedProTrial)),
677 "pro" => Some(Plan::V2(PlanV2::ZedPro)),
678 _ => {
679 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
680 }
681 };
682 }
683
684 self.plan_info.as_ref().map(|info| info.plan())
685 }
686
687 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
688 self.plan_info
689 .as_ref()
690 .and_then(|plan| plan.subscription_period)
691 .map(|subscription_period| {
692 (
693 subscription_period.started_at.0,
694 subscription_period.ended_at.0,
695 )
696 })
697 }
698
699 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
700 self.plan_info
701 .as_ref()
702 .and_then(|plan| plan.trial_started_at)
703 .map(|trial_started_at| trial_started_at.0)
704 }
705
706 /// Returns whether the user's account is too new to use the service.
707 pub fn account_too_young(&self) -> bool {
708 self.plan_info
709 .as_ref()
710 .map(|plan| plan.is_account_too_young)
711 .unwrap_or_default()
712 }
713
714 /// Returns whether the current user has overdue invoices and usage should be blocked.
715 pub fn has_overdue_invoices(&self) -> bool {
716 self.plan_info
717 .as_ref()
718 .map(|plan| plan.has_overdue_invoices)
719 .unwrap_or_default()
720 }
721
722 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
723 self.edit_prediction_usage
724 }
725
726 pub fn update_edit_prediction_usage(
727 &mut self,
728 usage: EditPredictionUsage,
729 cx: &mut Context<Self>,
730 ) {
731 self.edit_prediction_usage = Some(usage);
732 cx.notify();
733 }
734
735 pub fn clear_plan_and_usage(&mut self) {
736 self.plan_info = None;
737 self.edit_prediction_usage = None;
738 }
739
740 fn update_authenticated_user(
741 &mut self,
742 response: GetAuthenticatedUserResponse,
743 cx: &mut Context<Self>,
744 ) {
745 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
746 cx.update_flags(staff, response.feature_flags);
747 if let Some(client) = self.client.upgrade() {
748 client
749 .telemetry
750 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
751 }
752
753 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
754 limit: response.plan.usage.edit_predictions.limit,
755 amount: response.plan.usage.edit_predictions.used as i32,
756 }));
757 self.plan_info = Some(response.plan);
758 cx.emit(Event::PrivateUserInfoUpdated);
759 }
760
761 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
762 cx.spawn(async move |cx| {
763 match message {
764 MessageToClient::UserUpdated => {
765 let cloud_client = cx
766 .update(|cx| {
767 this.read_with(cx, |this, _cx| {
768 this.client.upgrade().map(|client| client.cloud_client())
769 })
770 })?
771 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
772
773 let response = cloud_client.get_authenticated_user().await?;
774 cx.update(|cx| {
775 this.update(cx, |this, cx| {
776 this.update_authenticated_user(response, cx);
777 })
778 })?;
779 }
780 }
781
782 anyhow::Ok(())
783 })
784 .detach_and_log_err(cx);
785 }
786
787 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
788 self.current_user.clone()
789 }
790
791 fn load_users(
792 &self,
793 request: impl RequestMessage<Response = UsersResponse>,
794 cx: &Context<Self>,
795 ) -> Task<Result<Vec<Arc<User>>>> {
796 let client = self.client.clone();
797 cx.spawn(async move |this, cx| {
798 if let Some(rpc) = client.upgrade() {
799 let response = rpc.request(request).await.context("error loading users")?;
800 let users = response.users;
801
802 this.update(cx, |this, _| this.insert(users))
803 } else {
804 Ok(Vec::new())
805 }
806 })
807 }
808
809 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
810 let mut ret = Vec::with_capacity(users.len());
811 for user in users {
812 let user = User::new(user);
813 if let Some(old) = self.users.insert(user.id, user.clone())
814 && old.github_login != user.github_login
815 {
816 self.by_github_login.remove(&old.github_login);
817 }
818 self.by_github_login
819 .insert(user.github_login.clone(), user.id);
820 ret.push(user)
821 }
822 ret
823 }
824
825 pub fn set_participant_indices(
826 &mut self,
827 participant_indices: HashMap<u64, ParticipantIndex>,
828 cx: &mut Context<Self>,
829 ) {
830 if participant_indices != self.participant_indices {
831 self.participant_indices = participant_indices;
832 cx.emit(Event::ParticipantIndicesChanged);
833 }
834 }
835
836 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
837 &self.participant_indices
838 }
839
840 pub fn participant_names(
841 &self,
842 user_ids: impl Iterator<Item = u64>,
843 cx: &App,
844 ) -> HashMap<u64, SharedString> {
845 let mut ret = HashMap::default();
846 let mut missing_user_ids = Vec::new();
847 for id in user_ids {
848 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
849 ret.insert(id, github_login);
850 } else {
851 missing_user_ids.push(id)
852 }
853 }
854 if !missing_user_ids.is_empty() {
855 let this = self.weak_self.clone();
856 cx.spawn(async move |cx| {
857 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
858 .await
859 })
860 .detach_and_log_err(cx);
861 }
862 ret
863 }
864}
865
866impl User {
867 fn new(message: proto::User) -> Arc<Self> {
868 Arc::new(User {
869 id: message.id,
870 github_login: message.github_login.into(),
871 avatar_uri: message.avatar_url.into(),
872 name: message.name,
873 })
874 }
875}
876
877impl Contact {
878 async fn from_proto(
879 contact: proto::Contact,
880 user_store: &Entity<UserStore>,
881 cx: &mut AsyncApp,
882 ) -> Result<Self> {
883 let user = user_store
884 .update(cx, |user_store, cx| {
885 user_store.get_user(contact.user_id, cx)
886 })
887 .await?;
888 Ok(Self {
889 user,
890 online: contact.online,
891 busy: contact.busy,
892 })
893 }
894}
895
896impl Collaborator {
897 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
898 Ok(Self {
899 peer_id: message.peer_id.context("invalid peer id")?,
900 replica_id: ReplicaId::new(message.replica_id as u16),
901 user_id: message.user_id as UserId,
902 is_host: message.is_host,
903 committer_name: message.committer_name,
904 committer_email: message.committer_email,
905 })
906 }
907}
908
909impl RequestUsage {
910 pub fn over_limit(&self) -> bool {
911 match self.limit {
912 UsageLimit::Limited(limit) => self.amount >= limit,
913 UsageLimit::Unlimited => false,
914 }
915 }
916
917 fn from_headers(
918 limit_name: &str,
919 amount_name: &str,
920 headers: &HeaderMap<HeaderValue>,
921 ) -> Result<Self> {
922 let limit = headers
923 .get(limit_name)
924 .with_context(|| format!("missing {limit_name:?} header"))?;
925 let limit = UsageLimit::from_str(limit.to_str()?)?;
926
927 let amount = headers
928 .get(amount_name)
929 .with_context(|| format!("missing {amount_name:?} header"))?;
930 let amount = amount.to_str()?.parse::<i32>()?;
931
932 Ok(Self { limit, amount })
933 }
934}
935
936impl EditPredictionUsage {
937 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
938 Ok(Self(RequestUsage::from_headers(
939 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
940 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
941 headers,
942 )?))
943 }
944}