1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{GetAuthenticatedUserResponse, Organization, Plan, PlanInfo};
6use cloud_llm_client::{
7 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
8};
9use collections::{HashMap, HashSet, hash_map::Entry};
10use derive_more::Deref;
11use feature_flags::FeatureFlagAppExt;
12use futures::{Future, StreamExt, channel::mpsc};
13use gpui::{
14 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
15};
16use http_client::http::{HeaderMap, HeaderValue};
17use postage::{sink::Sink, watch};
18use rpc::proto::{RequestMessage, UsersResponse};
19use std::{
20 str::FromStr as _,
21 sync::{Arc, Weak},
22};
23use text::ReplicaId;
24use util::{ResultExt, TryFutureExt as _};
25
26pub type UserId = u64;
27
28#[derive(
29 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
30)]
31pub struct ChannelId(pub u64);
32
33impl std::fmt::Display for ChannelId {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 self.0.fmt(f)
36 }
37}
38
39#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
40pub struct ProjectId(pub u64);
41
42impl ProjectId {
43 pub fn to_proto(self) -> u64 {
44 self.0
45 }
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct ParticipantIndex(pub u32);
50
51#[derive(Default, Debug)]
52pub struct User {
53 pub id: UserId,
54 pub github_login: SharedString,
55 pub avatar_uri: SharedUri,
56 pub name: Option<String>,
57}
58
59#[derive(Clone, Debug, PartialEq, Eq)]
60pub struct Collaborator {
61 pub peer_id: proto::PeerId,
62 pub replica_id: ReplicaId,
63 pub user_id: UserId,
64 pub is_host: bool,
65 pub committer_name: Option<String>,
66 pub committer_email: Option<String>,
67}
68
69impl PartialOrd for User {
70 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
71 Some(self.cmp(other))
72 }
73}
74
75impl Ord for User {
76 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
77 self.github_login.cmp(&other.github_login)
78 }
79}
80
81impl PartialEq for User {
82 fn eq(&self, other: &Self) -> bool {
83 self.id == other.id && self.github_login == other.github_login
84 }
85}
86
87impl Eq for User {}
88
89#[derive(Debug, PartialEq)]
90pub struct Contact {
91 pub user: Arc<User>,
92 pub online: bool,
93 pub busy: bool,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum ContactRequestStatus {
98 None,
99 RequestSent,
100 RequestReceived,
101 RequestAccepted,
102}
103
104pub struct UserStore {
105 users: HashMap<u64, Arc<User>>,
106 by_github_login: HashMap<SharedString, u64>,
107 participant_indices: HashMap<u64, ParticipantIndex>,
108 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
109 edit_prediction_usage: Option<EditPredictionUsage>,
110 plan_info: Option<PlanInfo>,
111 current_user: watch::Receiver<Option<Arc<User>>>,
112 current_organization: Option<Arc<Organization>>,
113 organizations: Vec<Arc<Organization>>,
114 contacts: Vec<Arc<Contact>>,
115 incoming_contact_requests: Vec<Arc<User>>,
116 outgoing_contact_requests: Vec<Arc<User>>,
117 pending_contact_requests: HashMap<u64, usize>,
118 client: Weak<Client>,
119 _maintain_contacts: Task<()>,
120 _maintain_current_user: Task<Result<()>>,
121 weak_self: WeakEntity<Self>,
122}
123
124#[derive(Clone)]
125pub struct InviteInfo {
126 pub count: u32,
127 pub url: Arc<str>,
128}
129
130pub enum Event {
131 Contact {
132 user: Arc<User>,
133 kind: ContactEventKind,
134 },
135 ShowContacts,
136 ParticipantIndicesChanged,
137 PrivateUserInfoUpdated,
138 PlanUpdated,
139}
140
141#[derive(Clone, Copy)]
142pub enum ContactEventKind {
143 Requested,
144 Accepted,
145 Cancelled,
146}
147
148impl EventEmitter<Event> for UserStore {}
149
150enum UpdateContacts {
151 Update(proto::UpdateContacts),
152 Wait(postage::barrier::Sender),
153 Clear(postage::barrier::Sender),
154}
155
156#[derive(Debug, Clone, Copy, Deref)]
157pub struct EditPredictionUsage(pub RequestUsage);
158
159#[derive(Debug, Clone, Copy)]
160pub struct RequestUsage {
161 pub limit: UsageLimit,
162 pub amount: i32,
163}
164
165impl UserStore {
166 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
167 let (mut current_user_tx, current_user_rx) = watch::channel();
168 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
169 let rpc_subscriptions = vec![
170 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
171 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
172 ];
173
174 client.add_message_to_client_handler({
175 let this = cx.weak_entity();
176 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
177 });
178
179 Self {
180 users: Default::default(),
181 by_github_login: Default::default(),
182 current_user: current_user_rx,
183 current_organization: None,
184 organizations: Vec::new(),
185 plan_info: None,
186 edit_prediction_usage: None,
187 contacts: Default::default(),
188 incoming_contact_requests: Default::default(),
189 participant_indices: Default::default(),
190 outgoing_contact_requests: Default::default(),
191 client: Arc::downgrade(&client),
192 update_contacts_tx,
193 _maintain_contacts: cx.spawn(async move |this, cx| {
194 let _subscriptions = rpc_subscriptions;
195 while let Some(message) = update_contacts_rx.next().await {
196 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
197 {
198 task.log_err().await;
199 } else {
200 break;
201 }
202 }
203 }),
204 _maintain_current_user: cx.spawn(async move |this, cx| {
205 let mut status = client.status();
206 let weak = Arc::downgrade(&client);
207 drop(client);
208 while let Some(status) = status.next().await {
209 // if the client is dropped, the app is shutting down.
210 let Some(client) = weak.upgrade() else {
211 return Ok(());
212 };
213 match status {
214 Status::Authenticated
215 | Status::Reauthenticated
216 | Status::Connected { .. } => {
217 if let Some(user_id) = client.user_id() {
218 let response = client
219 .cloud_client()
220 .get_authenticated_user()
221 .await
222 .log_err();
223
224 let current_user_and_response = if let Some(response) = response {
225 let user = Arc::new(User {
226 id: user_id,
227 github_login: response.user.github_login.clone().into(),
228 avatar_uri: response.user.avatar_url.clone().into(),
229 name: response.user.name.clone(),
230 });
231
232 Some((user, response))
233 } else {
234 None
235 };
236 current_user_tx
237 .send(
238 current_user_and_response
239 .as_ref()
240 .map(|(user, _)| user.clone()),
241 )
242 .await
243 .ok();
244
245 cx.update(|cx| {
246 if let Some((user, response)) = current_user_and_response {
247 this.update(cx, |this, cx| {
248 this.by_github_login
249 .insert(user.github_login.clone(), user_id);
250 this.users.insert(user_id, user);
251 this.update_authenticated_user(response, cx)
252 })
253 } else {
254 anyhow::Ok(())
255 }
256 })?;
257
258 this.update(cx, |_, cx| cx.notify())?;
259 }
260 }
261 Status::SignedOut => {
262 current_user_tx.send(None).await.ok();
263 this.update(cx, |this, cx| {
264 this.clear_organizations();
265 this.clear_plan_and_usage();
266 cx.emit(Event::PrivateUserInfoUpdated);
267 cx.notify();
268 this.clear_contacts()
269 })?
270 .await;
271 }
272 Status::ConnectionLost => {
273 this.update(cx, |this, cx| {
274 cx.notify();
275 this.clear_contacts()
276 })?
277 .await;
278 }
279 _ => {}
280 }
281 }
282 Ok(())
283 }),
284 pending_contact_requests: Default::default(),
285 weak_self: cx.weak_entity(),
286 }
287 }
288
289 #[cfg(feature = "test-support")]
290 pub fn clear_cache(&mut self) {
291 self.users.clear();
292 self.by_github_login.clear();
293 }
294
295 async fn handle_show_contacts(
296 this: Entity<Self>,
297 _: TypedEnvelope<proto::ShowContacts>,
298 mut cx: AsyncApp,
299 ) -> Result<()> {
300 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
301 Ok(())
302 }
303
304 async fn handle_update_contacts(
305 this: Entity<Self>,
306 message: TypedEnvelope<proto::UpdateContacts>,
307 cx: AsyncApp,
308 ) -> Result<()> {
309 this.read_with(&cx, |this, _| {
310 this.update_contacts_tx
311 .unbounded_send(UpdateContacts::Update(message.payload))
312 .unwrap();
313 });
314 Ok(())
315 }
316
317 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
318 match message {
319 UpdateContacts::Wait(barrier) => {
320 drop(barrier);
321 Task::ready(Ok(()))
322 }
323 UpdateContacts::Clear(barrier) => {
324 self.contacts.clear();
325 self.incoming_contact_requests.clear();
326 self.outgoing_contact_requests.clear();
327 drop(barrier);
328 Task::ready(Ok(()))
329 }
330 UpdateContacts::Update(message) => {
331 let mut user_ids = HashSet::default();
332 for contact in &message.contacts {
333 user_ids.insert(contact.user_id);
334 }
335 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
336 user_ids.extend(message.outgoing_requests.iter());
337
338 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
339 cx.spawn(async move |this, cx| {
340 load_users.await?;
341
342 // Users are fetched in parallel above and cached in call to get_users
343 // No need to parallelize here
344 let mut updated_contacts = Vec::new();
345 let this = this.upgrade().context("can't upgrade user store handle")?;
346 for contact in message.contacts {
347 updated_contacts
348 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
349 }
350
351 let mut incoming_requests = Vec::new();
352 for request in message.incoming_requests {
353 incoming_requests.push({
354 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
355 .await?
356 });
357 }
358
359 let mut outgoing_requests = Vec::new();
360 for requested_user_id in message.outgoing_requests {
361 outgoing_requests.push(
362 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
363 .await?,
364 );
365 }
366
367 let removed_contacts =
368 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
369 let removed_incoming_requests =
370 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
371 let removed_outgoing_requests =
372 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
373
374 this.update(cx, |this, cx| {
375 // Remove contacts
376 this.contacts
377 .retain(|contact| !removed_contacts.contains(&contact.user.id));
378 // Update existing contacts and insert new ones
379 for updated_contact in updated_contacts {
380 match this.contacts.binary_search_by_key(
381 &&updated_contact.user.github_login,
382 |contact| &contact.user.github_login,
383 ) {
384 Ok(ix) => this.contacts[ix] = updated_contact,
385 Err(ix) => this.contacts.insert(ix, updated_contact),
386 }
387 }
388
389 // Remove incoming contact requests
390 this.incoming_contact_requests.retain(|user| {
391 if removed_incoming_requests.contains(&user.id) {
392 cx.emit(Event::Contact {
393 user: user.clone(),
394 kind: ContactEventKind::Cancelled,
395 });
396 false
397 } else {
398 true
399 }
400 });
401 // Update existing incoming requests and insert new ones
402 for user in incoming_requests {
403 match this
404 .incoming_contact_requests
405 .binary_search_by_key(&&user.github_login, |contact| {
406 &contact.github_login
407 }) {
408 Ok(ix) => this.incoming_contact_requests[ix] = user,
409 Err(ix) => this.incoming_contact_requests.insert(ix, user),
410 }
411 }
412
413 // Remove outgoing contact requests
414 this.outgoing_contact_requests
415 .retain(|user| !removed_outgoing_requests.contains(&user.id));
416 // Update existing incoming requests and insert new ones
417 for request in outgoing_requests {
418 match this
419 .outgoing_contact_requests
420 .binary_search_by_key(&&request.github_login, |contact| {
421 &contact.github_login
422 }) {
423 Ok(ix) => this.outgoing_contact_requests[ix] = request,
424 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
425 }
426 }
427
428 cx.notify();
429 });
430
431 Ok(())
432 })
433 }
434 }
435 }
436
437 pub fn contacts(&self) -> &[Arc<Contact>] {
438 &self.contacts
439 }
440
441 pub fn has_contact(&self, user: &Arc<User>) -> bool {
442 self.contacts
443 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
444 .is_ok()
445 }
446
447 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
448 &self.incoming_contact_requests
449 }
450
451 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
452 &self.outgoing_contact_requests
453 }
454
455 pub fn is_contact_request_pending(&self, user: &User) -> bool {
456 self.pending_contact_requests.contains_key(&user.id)
457 }
458
459 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
460 if self
461 .contacts
462 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
463 .is_ok()
464 {
465 ContactRequestStatus::RequestAccepted
466 } else if self
467 .outgoing_contact_requests
468 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
469 .is_ok()
470 {
471 ContactRequestStatus::RequestSent
472 } else if self
473 .incoming_contact_requests
474 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
475 .is_ok()
476 {
477 ContactRequestStatus::RequestReceived
478 } else {
479 ContactRequestStatus::None
480 }
481 }
482
483 pub fn request_contact(
484 &mut self,
485 responder_id: u64,
486 cx: &mut Context<Self>,
487 ) -> Task<Result<()>> {
488 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
489 }
490
491 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
492 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
493 }
494
495 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
496 self.incoming_contact_requests
497 .iter()
498 .any(|user| user.id == user_id)
499 }
500
501 pub fn respond_to_contact_request(
502 &mut self,
503 requester_id: u64,
504 accept: bool,
505 cx: &mut Context<Self>,
506 ) -> Task<Result<()>> {
507 self.perform_contact_request(
508 requester_id,
509 proto::RespondToContactRequest {
510 requester_id,
511 response: if accept {
512 proto::ContactRequestResponse::Accept
513 } else {
514 proto::ContactRequestResponse::Decline
515 } as i32,
516 },
517 cx,
518 )
519 }
520
521 pub fn dismiss_contact_request(
522 &self,
523 requester_id: u64,
524 cx: &Context<Self>,
525 ) -> Task<Result<()>> {
526 let client = self.client.upgrade();
527 cx.spawn(async move |_, _| {
528 client
529 .context("can't upgrade client reference")?
530 .request(proto::RespondToContactRequest {
531 requester_id,
532 response: proto::ContactRequestResponse::Dismiss as i32,
533 })
534 .await?;
535 Ok(())
536 })
537 }
538
539 fn perform_contact_request<T: RequestMessage>(
540 &mut self,
541 user_id: u64,
542 request: T,
543 cx: &mut Context<Self>,
544 ) -> Task<Result<()>> {
545 let client = self.client.upgrade();
546 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
547 cx.notify();
548
549 cx.spawn(async move |this, cx| {
550 let response = client
551 .context("can't upgrade client reference")?
552 .request(request)
553 .await;
554 this.update(cx, |this, cx| {
555 if let Entry::Occupied(mut request_count) =
556 this.pending_contact_requests.entry(user_id)
557 {
558 *request_count.get_mut() -= 1;
559 if *request_count.get() == 0 {
560 request_count.remove();
561 }
562 }
563 cx.notify();
564 })?;
565 response?;
566 Ok(())
567 })
568 }
569
570 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
571 let (tx, mut rx) = postage::barrier::channel();
572 self.update_contacts_tx
573 .unbounded_send(UpdateContacts::Clear(tx))
574 .unwrap();
575 async move {
576 rx.next().await;
577 }
578 }
579
580 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
581 let (tx, mut rx) = postage::barrier::channel();
582 self.update_contacts_tx
583 .unbounded_send(UpdateContacts::Wait(tx))
584 .unwrap();
585 async move {
586 rx.next().await;
587 }
588 }
589
590 pub fn get_users(
591 &self,
592 user_ids: Vec<u64>,
593 cx: &Context<Self>,
594 ) -> Task<Result<Vec<Arc<User>>>> {
595 let mut user_ids_to_fetch = user_ids.clone();
596 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
597
598 cx.spawn(async move |this, cx| {
599 if !user_ids_to_fetch.is_empty() {
600 this.update(cx, |this, cx| {
601 this.load_users(
602 proto::GetUsers {
603 user_ids: user_ids_to_fetch,
604 },
605 cx,
606 )
607 })?
608 .await?;
609 }
610
611 this.read_with(cx, |this, _| {
612 user_ids
613 .iter()
614 .map(|user_id| {
615 this.users
616 .get(user_id)
617 .cloned()
618 .with_context(|| format!("user {user_id} not found"))
619 })
620 .collect()
621 })?
622 })
623 }
624
625 pub fn fuzzy_search_users(
626 &self,
627 query: String,
628 cx: &Context<Self>,
629 ) -> Task<Result<Vec<Arc<User>>>> {
630 self.load_users(proto::FuzzySearchUsers { query }, cx)
631 }
632
633 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
634 self.users.get(&user_id).cloned()
635 }
636
637 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
638 if let Some(user) = self.users.get(&user_id).cloned() {
639 return Some(user);
640 }
641
642 self.get_user(user_id, cx).detach_and_log_err(cx);
643 None
644 }
645
646 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
647 if let Some(user) = self.users.get(&user_id).cloned() {
648 return Task::ready(Ok(user));
649 }
650
651 let load_users = self.get_users(vec![user_id], cx);
652 cx.spawn(async move |this, cx| {
653 load_users.await?;
654 this.read_with(cx, |this, _| {
655 this.users
656 .get(&user_id)
657 .cloned()
658 .context("server responded with no users")
659 })?
660 })
661 }
662
663 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
664 self.by_github_login
665 .get(github_login)
666 .and_then(|id| self.users.get(id).cloned())
667 }
668
669 pub fn current_user(&self) -> Option<Arc<User>> {
670 self.current_user.borrow().clone()
671 }
672
673 pub fn current_organization(&self) -> Option<Arc<Organization>> {
674 self.current_organization.clone()
675 }
676
677 pub fn plan(&self) -> Option<Plan> {
678 #[cfg(debug_assertions)]
679 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
680 use cloud_api_client::Plan;
681
682 return match plan.as_str() {
683 "free" => Some(Plan::ZedFree),
684 "trial" => Some(Plan::ZedProTrial),
685 "pro" => Some(Plan::ZedPro),
686 _ => {
687 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
688 }
689 };
690 }
691
692 self.plan_info.as_ref().map(|info| info.plan())
693 }
694
695 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
696 self.plan_info
697 .as_ref()
698 .and_then(|plan| plan.subscription_period)
699 .map(|subscription_period| {
700 (
701 subscription_period.started_at.0,
702 subscription_period.ended_at.0,
703 )
704 })
705 }
706
707 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
708 self.plan_info
709 .as_ref()
710 .and_then(|plan| plan.trial_started_at)
711 .map(|trial_started_at| trial_started_at.0)
712 }
713
714 /// Returns whether the user's account is too new to use the service.
715 pub fn account_too_young(&self) -> bool {
716 self.plan_info
717 .as_ref()
718 .map(|plan| plan.is_account_too_young)
719 .unwrap_or_default()
720 }
721
722 /// Returns whether the current user has overdue invoices and usage should be blocked.
723 pub fn has_overdue_invoices(&self) -> bool {
724 self.plan_info
725 .as_ref()
726 .map(|plan| plan.has_overdue_invoices)
727 .unwrap_or_default()
728 }
729
730 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
731 self.edit_prediction_usage
732 }
733
734 pub fn update_edit_prediction_usage(
735 &mut self,
736 usage: EditPredictionUsage,
737 cx: &mut Context<Self>,
738 ) {
739 self.edit_prediction_usage = Some(usage);
740 cx.notify();
741 }
742
743 pub fn clear_organizations(&mut self) {
744 self.organizations.clear();
745 self.current_organization = None;
746 }
747
748 pub fn clear_plan_and_usage(&mut self) {
749 self.plan_info = None;
750 self.edit_prediction_usage = None;
751 }
752
753 fn update_authenticated_user(
754 &mut self,
755 response: GetAuthenticatedUserResponse,
756 cx: &mut Context<Self>,
757 ) {
758 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
759 cx.update_flags(staff, response.feature_flags);
760 if let Some(client) = self.client.upgrade() {
761 client
762 .telemetry
763 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
764 }
765
766 self.organizations = response.organizations.into_iter().map(Arc::new).collect();
767 self.current_organization = self.organizations.first().cloned();
768
769 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
770 limit: response.plan.usage.edit_predictions.limit,
771 amount: response.plan.usage.edit_predictions.used as i32,
772 }));
773 self.plan_info = Some(response.plan);
774 cx.emit(Event::PrivateUserInfoUpdated);
775 }
776
777 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
778 cx.spawn(async move |cx| {
779 match message {
780 MessageToClient::UserUpdated => {
781 let cloud_client = cx
782 .update(|cx| {
783 this.read_with(cx, |this, _cx| {
784 this.client.upgrade().map(|client| client.cloud_client())
785 })
786 })?
787 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
788
789 let response = cloud_client.get_authenticated_user().await?;
790 cx.update(|cx| {
791 this.update(cx, |this, cx| {
792 this.update_authenticated_user(response, cx);
793 })
794 })?;
795 }
796 }
797
798 anyhow::Ok(())
799 })
800 .detach_and_log_err(cx);
801 }
802
803 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
804 self.current_user.clone()
805 }
806
807 fn load_users(
808 &self,
809 request: impl RequestMessage<Response = UsersResponse>,
810 cx: &Context<Self>,
811 ) -> Task<Result<Vec<Arc<User>>>> {
812 let client = self.client.clone();
813 cx.spawn(async move |this, cx| {
814 if let Some(rpc) = client.upgrade() {
815 let response = rpc.request(request).await.context("error loading users")?;
816 let users = response.users;
817
818 this.update(cx, |this, _| this.insert(users))
819 } else {
820 Ok(Vec::new())
821 }
822 })
823 }
824
825 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
826 let mut ret = Vec::with_capacity(users.len());
827 for user in users {
828 let user = User::new(user);
829 if let Some(old) = self.users.insert(user.id, user.clone())
830 && old.github_login != user.github_login
831 {
832 self.by_github_login.remove(&old.github_login);
833 }
834 self.by_github_login
835 .insert(user.github_login.clone(), user.id);
836 ret.push(user)
837 }
838 ret
839 }
840
841 pub fn set_participant_indices(
842 &mut self,
843 participant_indices: HashMap<u64, ParticipantIndex>,
844 cx: &mut Context<Self>,
845 ) {
846 if participant_indices != self.participant_indices {
847 self.participant_indices = participant_indices;
848 cx.emit(Event::ParticipantIndicesChanged);
849 }
850 }
851
852 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
853 &self.participant_indices
854 }
855
856 pub fn participant_names(
857 &self,
858 user_ids: impl Iterator<Item = u64>,
859 cx: &App,
860 ) -> HashMap<u64, SharedString> {
861 let mut ret = HashMap::default();
862 let mut missing_user_ids = Vec::new();
863 for id in user_ids {
864 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
865 ret.insert(id, github_login);
866 } else {
867 missing_user_ids.push(id)
868 }
869 }
870 if !missing_user_ids.is_empty() {
871 let this = self.weak_self.clone();
872 cx.spawn(async move |cx| {
873 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
874 .await
875 })
876 .detach_and_log_err(cx);
877 }
878 ret
879 }
880}
881
882impl User {
883 fn new(message: proto::User) -> Arc<Self> {
884 Arc::new(User {
885 id: message.id,
886 github_login: message.github_login.into(),
887 avatar_uri: message.avatar_url.into(),
888 name: message.name,
889 })
890 }
891}
892
893impl Contact {
894 async fn from_proto(
895 contact: proto::Contact,
896 user_store: &Entity<UserStore>,
897 cx: &mut AsyncApp,
898 ) -> Result<Self> {
899 let user = user_store
900 .update(cx, |user_store, cx| {
901 user_store.get_user(contact.user_id, cx)
902 })
903 .await?;
904 Ok(Self {
905 user,
906 online: contact.online,
907 busy: contact.busy,
908 })
909 }
910}
911
912impl Collaborator {
913 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
914 Ok(Self {
915 peer_id: message.peer_id.context("invalid peer id")?,
916 replica_id: ReplicaId::new(message.replica_id as u16),
917 user_id: message.user_id as UserId,
918 is_host: message.is_host,
919 committer_name: message.committer_name,
920 committer_email: message.committer_email,
921 })
922 }
923}
924
925impl RequestUsage {
926 pub fn over_limit(&self) -> bool {
927 match self.limit {
928 UsageLimit::Limited(limit) => self.amount >= limit,
929 UsageLimit::Unlimited => false,
930 }
931 }
932
933 fn from_headers(
934 limit_name: &str,
935 amount_name: &str,
936 headers: &HeaderMap<HeaderValue>,
937 ) -> Result<Self> {
938 let limit = headers
939 .get(limit_name)
940 .with_context(|| format!("missing {limit_name:?} header"))?;
941 let limit = UsageLimit::from_str(limit.to_str()?)?;
942
943 let amount = headers
944 .get(amount_name)
945 .with_context(|| format!("missing {amount_name:?} header"))?;
946 let amount = amount.to_str()?.parse::<i32>()?;
947
948 Ok(Self { limit, amount })
949 }
950}
951
952impl EditPredictionUsage {
953 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
954 Ok(Self(RequestUsage::from_headers(
955 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
956 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
957 headers,
958 )?))
959 }
960}