1use super::{Client, Status, TypedEnvelope, proto};
2use anyhow::{Context as _, Result};
3use chrono::{DateTime, Utc};
4use cloud_api_client::websocket_protocol::MessageToClient;
5use cloud_api_client::{GetAuthenticatedUserResponse, Organization, Plan, PlanInfo};
6use cloud_llm_client::{
7 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
8};
9use collections::{HashMap, HashSet, hash_map::Entry};
10use derive_more::Deref;
11use feature_flags::FeatureFlagAppExt;
12use futures::{Future, StreamExt, channel::mpsc};
13use gpui::{
14 App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
15};
16use http_client::http::{HeaderMap, HeaderValue};
17use postage::{sink::Sink, watch};
18use rpc::proto::{RequestMessage, UsersResponse};
19use std::{
20 str::FromStr as _,
21 sync::{Arc, Weak},
22};
23use text::ReplicaId;
24use util::{ResultExt, TryFutureExt as _};
25
26pub type UserId = u64;
27
28#[derive(
29 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
30)]
31pub struct ChannelId(pub u64);
32
33impl std::fmt::Display for ChannelId {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 self.0.fmt(f)
36 }
37}
38
39#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
40pub struct ProjectId(pub u64);
41
42impl ProjectId {
43 pub fn to_proto(self) -> u64 {
44 self.0
45 }
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct ParticipantIndex(pub u32);
50
51#[derive(Default, Debug)]
52pub struct User {
53 pub id: UserId,
54 pub github_login: SharedString,
55 pub avatar_uri: SharedUri,
56 pub name: Option<String>,
57}
58
59#[derive(Clone, Debug, PartialEq, Eq)]
60pub struct Collaborator {
61 pub peer_id: proto::PeerId,
62 pub replica_id: ReplicaId,
63 pub user_id: UserId,
64 pub is_host: bool,
65 pub committer_name: Option<String>,
66 pub committer_email: Option<String>,
67}
68
69impl PartialOrd for User {
70 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
71 Some(self.cmp(other))
72 }
73}
74
75impl Ord for User {
76 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
77 self.github_login.cmp(&other.github_login)
78 }
79}
80
81impl PartialEq for User {
82 fn eq(&self, other: &Self) -> bool {
83 self.id == other.id && self.github_login == other.github_login
84 }
85}
86
87impl Eq for User {}
88
89#[derive(Debug, PartialEq)]
90pub struct Contact {
91 pub user: Arc<User>,
92 pub online: bool,
93 pub busy: bool,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum ContactRequestStatus {
98 None,
99 RequestSent,
100 RequestReceived,
101 RequestAccepted,
102}
103
104pub struct UserStore {
105 users: HashMap<u64, Arc<User>>,
106 by_github_login: HashMap<SharedString, u64>,
107 participant_indices: HashMap<u64, ParticipantIndex>,
108 update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
109 edit_prediction_usage: Option<EditPredictionUsage>,
110 plan_info: Option<PlanInfo>,
111 current_user: watch::Receiver<Option<Arc<User>>>,
112 current_organization: Option<Arc<Organization>>,
113 organizations: Vec<Arc<Organization>>,
114 contacts: Vec<Arc<Contact>>,
115 incoming_contact_requests: Vec<Arc<User>>,
116 outgoing_contact_requests: Vec<Arc<User>>,
117 pending_contact_requests: HashMap<u64, usize>,
118 client: Weak<Client>,
119 _maintain_contacts: Task<()>,
120 _maintain_current_user: Task<Result<()>>,
121 _handle_sign_out: Task<()>,
122 weak_self: WeakEntity<Self>,
123}
124
125#[derive(Clone)]
126pub struct InviteInfo {
127 pub count: u32,
128 pub url: Arc<str>,
129}
130
131pub enum Event {
132 Contact {
133 user: Arc<User>,
134 kind: ContactEventKind,
135 },
136 ShowContacts,
137 ParticipantIndicesChanged,
138 PrivateUserInfoUpdated,
139 PlanUpdated,
140}
141
142#[derive(Clone, Copy)]
143pub enum ContactEventKind {
144 Requested,
145 Accepted,
146 Cancelled,
147}
148
149impl EventEmitter<Event> for UserStore {}
150
151enum UpdateContacts {
152 Update(proto::UpdateContacts),
153 Wait(postage::barrier::Sender),
154 Clear(postage::barrier::Sender),
155}
156
157#[derive(Debug, Clone, Copy, Deref)]
158pub struct EditPredictionUsage(pub RequestUsage);
159
160#[derive(Debug, Clone, Copy)]
161pub struct RequestUsage {
162 pub limit: UsageLimit,
163 pub amount: i32,
164}
165
166impl UserStore {
167 pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
168 let (mut current_user_tx, current_user_rx) = watch::channel();
169 let (sign_out_tx, mut sign_out_rx) = mpsc::unbounded();
170 let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
171 let rpc_subscriptions = vec![
172 client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
173 client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
174 ];
175
176 client.sign_out_tx.lock().replace(sign_out_tx);
177 client.add_message_to_client_handler({
178 let this = cx.weak_entity();
179 move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
180 });
181
182 Self {
183 users: Default::default(),
184 by_github_login: Default::default(),
185 current_user: current_user_rx,
186 current_organization: None,
187 organizations: Vec::new(),
188 plan_info: None,
189 edit_prediction_usage: None,
190 contacts: Default::default(),
191 incoming_contact_requests: Default::default(),
192 participant_indices: Default::default(),
193 outgoing_contact_requests: Default::default(),
194 client: Arc::downgrade(&client),
195 update_contacts_tx,
196 _maintain_contacts: cx.spawn(async move |this, cx| {
197 let _subscriptions = rpc_subscriptions;
198 while let Some(message) = update_contacts_rx.next().await {
199 if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
200 {
201 task.log_err().await;
202 } else {
203 break;
204 }
205 }
206 }),
207 _maintain_current_user: cx.spawn(async move |this, cx| {
208 let mut status = client.status();
209 let weak = Arc::downgrade(&client);
210 drop(client);
211 while let Some(status) = status.next().await {
212 // if the client is dropped, the app is shutting down.
213 let Some(client) = weak.upgrade() else {
214 return Ok(());
215 };
216 match status {
217 Status::Authenticated
218 | Status::Reauthenticated
219 | Status::Connected { .. } => {
220 if let Some(user_id) = client.user_id() {
221 let response = client
222 .cloud_client()
223 .get_authenticated_user()
224 .await
225 .log_err();
226
227 let current_user_and_response = if let Some(response) = response {
228 let user = Arc::new(User {
229 id: user_id,
230 github_login: response.user.github_login.clone().into(),
231 avatar_uri: response.user.avatar_url.clone().into(),
232 name: response.user.name.clone(),
233 });
234
235 Some((user, response))
236 } else {
237 None
238 };
239 current_user_tx
240 .send(
241 current_user_and_response
242 .as_ref()
243 .map(|(user, _)| user.clone()),
244 )
245 .await
246 .ok();
247
248 cx.update(|cx| {
249 if let Some((user, response)) = current_user_and_response {
250 this.update(cx, |this, cx| {
251 this.by_github_login
252 .insert(user.github_login.clone(), user_id);
253 this.users.insert(user_id, user);
254 this.update_authenticated_user(response, cx)
255 })
256 } else {
257 anyhow::Ok(())
258 }
259 })?;
260
261 this.update(cx, |_, cx| cx.notify())?;
262 }
263 }
264 Status::SignedOut => {
265 current_user_tx.send(None).await.ok();
266 this.update(cx, |this, cx| {
267 this.clear_organizations();
268 this.clear_plan_and_usage();
269 cx.emit(Event::PrivateUserInfoUpdated);
270 cx.notify();
271 this.clear_contacts()
272 })?
273 .await;
274 }
275 Status::ConnectionLost => {
276 this.update(cx, |this, cx| {
277 cx.notify();
278 this.clear_contacts()
279 })?
280 .await;
281 }
282 _ => {}
283 }
284 }
285 Ok(())
286 }),
287 _handle_sign_out: cx.spawn(async move |this, cx| {
288 while let Some(()) = sign_out_rx.next().await {
289 let Some(client) = this
290 .read_with(cx, |this, _cx| this.client.upgrade())
291 .ok()
292 .flatten()
293 else {
294 break;
295 };
296
297 client.sign_out(cx).await;
298 }
299 }),
300 pending_contact_requests: Default::default(),
301 weak_self: cx.weak_entity(),
302 }
303 }
304
305 #[cfg(feature = "test-support")]
306 pub fn clear_cache(&mut self) {
307 self.users.clear();
308 self.by_github_login.clear();
309 }
310
311 async fn handle_show_contacts(
312 this: Entity<Self>,
313 _: TypedEnvelope<proto::ShowContacts>,
314 mut cx: AsyncApp,
315 ) -> Result<()> {
316 this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
317 Ok(())
318 }
319
320 async fn handle_update_contacts(
321 this: Entity<Self>,
322 message: TypedEnvelope<proto::UpdateContacts>,
323 cx: AsyncApp,
324 ) -> Result<()> {
325 this.read_with(&cx, |this, _| {
326 this.update_contacts_tx
327 .unbounded_send(UpdateContacts::Update(message.payload))
328 .unwrap();
329 });
330 Ok(())
331 }
332
333 fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
334 match message {
335 UpdateContacts::Wait(barrier) => {
336 drop(barrier);
337 Task::ready(Ok(()))
338 }
339 UpdateContacts::Clear(barrier) => {
340 self.contacts.clear();
341 self.incoming_contact_requests.clear();
342 self.outgoing_contact_requests.clear();
343 drop(barrier);
344 Task::ready(Ok(()))
345 }
346 UpdateContacts::Update(message) => {
347 let mut user_ids = HashSet::default();
348 for contact in &message.contacts {
349 user_ids.insert(contact.user_id);
350 }
351 user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
352 user_ids.extend(message.outgoing_requests.iter());
353
354 let load_users = self.get_users(user_ids.into_iter().collect(), cx);
355 cx.spawn(async move |this, cx| {
356 load_users.await?;
357
358 // Users are fetched in parallel above and cached in call to get_users
359 // No need to parallelize here
360 let mut updated_contacts = Vec::new();
361 let this = this.upgrade().context("can't upgrade user store handle")?;
362 for contact in message.contacts {
363 updated_contacts
364 .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
365 }
366
367 let mut incoming_requests = Vec::new();
368 for request in message.incoming_requests {
369 incoming_requests.push({
370 this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
371 .await?
372 });
373 }
374
375 let mut outgoing_requests = Vec::new();
376 for requested_user_id in message.outgoing_requests {
377 outgoing_requests.push(
378 this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
379 .await?,
380 );
381 }
382
383 let removed_contacts =
384 HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
385 let removed_incoming_requests =
386 HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
387 let removed_outgoing_requests =
388 HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
389
390 this.update(cx, |this, cx| {
391 // Remove contacts
392 this.contacts
393 .retain(|contact| !removed_contacts.contains(&contact.user.id));
394 // Update existing contacts and insert new ones
395 for updated_contact in updated_contacts {
396 match this.contacts.binary_search_by_key(
397 &&updated_contact.user.github_login,
398 |contact| &contact.user.github_login,
399 ) {
400 Ok(ix) => this.contacts[ix] = updated_contact,
401 Err(ix) => this.contacts.insert(ix, updated_contact),
402 }
403 }
404
405 // Remove incoming contact requests
406 this.incoming_contact_requests.retain(|user| {
407 if removed_incoming_requests.contains(&user.id) {
408 cx.emit(Event::Contact {
409 user: user.clone(),
410 kind: ContactEventKind::Cancelled,
411 });
412 false
413 } else {
414 true
415 }
416 });
417 // Update existing incoming requests and insert new ones
418 for user in incoming_requests {
419 match this
420 .incoming_contact_requests
421 .binary_search_by_key(&&user.github_login, |contact| {
422 &contact.github_login
423 }) {
424 Ok(ix) => this.incoming_contact_requests[ix] = user,
425 Err(ix) => this.incoming_contact_requests.insert(ix, user),
426 }
427 }
428
429 // Remove outgoing contact requests
430 this.outgoing_contact_requests
431 .retain(|user| !removed_outgoing_requests.contains(&user.id));
432 // Update existing incoming requests and insert new ones
433 for request in outgoing_requests {
434 match this
435 .outgoing_contact_requests
436 .binary_search_by_key(&&request.github_login, |contact| {
437 &contact.github_login
438 }) {
439 Ok(ix) => this.outgoing_contact_requests[ix] = request,
440 Err(ix) => this.outgoing_contact_requests.insert(ix, request),
441 }
442 }
443
444 cx.notify();
445 });
446
447 Ok(())
448 })
449 }
450 }
451 }
452
453 pub fn contacts(&self) -> &[Arc<Contact>] {
454 &self.contacts
455 }
456
457 pub fn has_contact(&self, user: &Arc<User>) -> bool {
458 self.contacts
459 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
460 .is_ok()
461 }
462
463 pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
464 &self.incoming_contact_requests
465 }
466
467 pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
468 &self.outgoing_contact_requests
469 }
470
471 pub fn is_contact_request_pending(&self, user: &User) -> bool {
472 self.pending_contact_requests.contains_key(&user.id)
473 }
474
475 pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
476 if self
477 .contacts
478 .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
479 .is_ok()
480 {
481 ContactRequestStatus::RequestAccepted
482 } else if self
483 .outgoing_contact_requests
484 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
485 .is_ok()
486 {
487 ContactRequestStatus::RequestSent
488 } else if self
489 .incoming_contact_requests
490 .binary_search_by_key(&&user.github_login, |user| &user.github_login)
491 .is_ok()
492 {
493 ContactRequestStatus::RequestReceived
494 } else {
495 ContactRequestStatus::None
496 }
497 }
498
499 pub fn request_contact(
500 &mut self,
501 responder_id: u64,
502 cx: &mut Context<Self>,
503 ) -> Task<Result<()>> {
504 self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
505 }
506
507 pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
508 self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
509 }
510
511 pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
512 self.incoming_contact_requests
513 .iter()
514 .any(|user| user.id == user_id)
515 }
516
517 pub fn respond_to_contact_request(
518 &mut self,
519 requester_id: u64,
520 accept: bool,
521 cx: &mut Context<Self>,
522 ) -> Task<Result<()>> {
523 self.perform_contact_request(
524 requester_id,
525 proto::RespondToContactRequest {
526 requester_id,
527 response: if accept {
528 proto::ContactRequestResponse::Accept
529 } else {
530 proto::ContactRequestResponse::Decline
531 } as i32,
532 },
533 cx,
534 )
535 }
536
537 pub fn dismiss_contact_request(
538 &self,
539 requester_id: u64,
540 cx: &Context<Self>,
541 ) -> Task<Result<()>> {
542 let client = self.client.upgrade();
543 cx.spawn(async move |_, _| {
544 client
545 .context("can't upgrade client reference")?
546 .request(proto::RespondToContactRequest {
547 requester_id,
548 response: proto::ContactRequestResponse::Dismiss as i32,
549 })
550 .await?;
551 Ok(())
552 })
553 }
554
555 fn perform_contact_request<T: RequestMessage>(
556 &mut self,
557 user_id: u64,
558 request: T,
559 cx: &mut Context<Self>,
560 ) -> Task<Result<()>> {
561 let client = self.client.upgrade();
562 *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
563 cx.notify();
564
565 cx.spawn(async move |this, cx| {
566 let response = client
567 .context("can't upgrade client reference")?
568 .request(request)
569 .await;
570 this.update(cx, |this, cx| {
571 if let Entry::Occupied(mut request_count) =
572 this.pending_contact_requests.entry(user_id)
573 {
574 *request_count.get_mut() -= 1;
575 if *request_count.get() == 0 {
576 request_count.remove();
577 }
578 }
579 cx.notify();
580 })?;
581 response?;
582 Ok(())
583 })
584 }
585
586 pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
587 let (tx, mut rx) = postage::barrier::channel();
588 self.update_contacts_tx
589 .unbounded_send(UpdateContacts::Clear(tx))
590 .unwrap();
591 async move {
592 rx.next().await;
593 }
594 }
595
596 pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
597 let (tx, mut rx) = postage::barrier::channel();
598 self.update_contacts_tx
599 .unbounded_send(UpdateContacts::Wait(tx))
600 .unwrap();
601 async move {
602 rx.next().await;
603 }
604 }
605
606 pub fn get_users(
607 &self,
608 user_ids: Vec<u64>,
609 cx: &Context<Self>,
610 ) -> Task<Result<Vec<Arc<User>>>> {
611 let mut user_ids_to_fetch = user_ids.clone();
612 user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
613
614 cx.spawn(async move |this, cx| {
615 if !user_ids_to_fetch.is_empty() {
616 this.update(cx, |this, cx| {
617 this.load_users(
618 proto::GetUsers {
619 user_ids: user_ids_to_fetch,
620 },
621 cx,
622 )
623 })?
624 .await?;
625 }
626
627 this.read_with(cx, |this, _| {
628 user_ids
629 .iter()
630 .map(|user_id| {
631 this.users
632 .get(user_id)
633 .cloned()
634 .with_context(|| format!("user {user_id} not found"))
635 })
636 .collect()
637 })?
638 })
639 }
640
641 pub fn fuzzy_search_users(
642 &self,
643 query: String,
644 cx: &Context<Self>,
645 ) -> Task<Result<Vec<Arc<User>>>> {
646 self.load_users(proto::FuzzySearchUsers { query }, cx)
647 }
648
649 pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
650 self.users.get(&user_id).cloned()
651 }
652
653 pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
654 if let Some(user) = self.users.get(&user_id).cloned() {
655 return Some(user);
656 }
657
658 self.get_user(user_id, cx).detach_and_log_err(cx);
659 None
660 }
661
662 pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
663 if let Some(user) = self.users.get(&user_id).cloned() {
664 return Task::ready(Ok(user));
665 }
666
667 let load_users = self.get_users(vec![user_id], cx);
668 cx.spawn(async move |this, cx| {
669 load_users.await?;
670 this.read_with(cx, |this, _| {
671 this.users
672 .get(&user_id)
673 .cloned()
674 .context("server responded with no users")
675 })?
676 })
677 }
678
679 pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
680 self.by_github_login
681 .get(github_login)
682 .and_then(|id| self.users.get(id).cloned())
683 }
684
685 pub fn current_user(&self) -> Option<Arc<User>> {
686 self.current_user.borrow().clone()
687 }
688
689 pub fn current_organization(&self) -> Option<Arc<Organization>> {
690 self.current_organization.clone()
691 }
692
693 pub fn plan(&self) -> Option<Plan> {
694 #[cfg(debug_assertions)]
695 if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
696 use cloud_api_client::Plan;
697
698 return match plan.as_str() {
699 "free" => Some(Plan::ZedFree),
700 "trial" => Some(Plan::ZedProTrial),
701 "pro" => Some(Plan::ZedPro),
702 _ => {
703 panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
704 }
705 };
706 }
707
708 self.plan_info.as_ref().map(|info| info.plan())
709 }
710
711 pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
712 self.plan_info
713 .as_ref()
714 .and_then(|plan| plan.subscription_period)
715 .map(|subscription_period| {
716 (
717 subscription_period.started_at.0,
718 subscription_period.ended_at.0,
719 )
720 })
721 }
722
723 pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
724 self.plan_info
725 .as_ref()
726 .and_then(|plan| plan.trial_started_at)
727 .map(|trial_started_at| trial_started_at.0)
728 }
729
730 /// Returns whether the user's account is too new to use the service.
731 pub fn account_too_young(&self) -> bool {
732 self.plan_info
733 .as_ref()
734 .map(|plan| plan.is_account_too_young)
735 .unwrap_or_default()
736 }
737
738 /// Returns whether the current user has overdue invoices and usage should be blocked.
739 pub fn has_overdue_invoices(&self) -> bool {
740 self.plan_info
741 .as_ref()
742 .map(|plan| plan.has_overdue_invoices)
743 .unwrap_or_default()
744 }
745
746 pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
747 self.edit_prediction_usage
748 }
749
750 pub fn update_edit_prediction_usage(
751 &mut self,
752 usage: EditPredictionUsage,
753 cx: &mut Context<Self>,
754 ) {
755 self.edit_prediction_usage = Some(usage);
756 cx.notify();
757 }
758
759 pub fn clear_organizations(&mut self) {
760 self.organizations.clear();
761 self.current_organization = None;
762 }
763
764 pub fn clear_plan_and_usage(&mut self) {
765 self.plan_info = None;
766 self.edit_prediction_usage = None;
767 }
768
769 fn update_authenticated_user(
770 &mut self,
771 response: GetAuthenticatedUserResponse,
772 cx: &mut Context<Self>,
773 ) {
774 let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
775 cx.update_flags(staff, response.feature_flags);
776 if let Some(client) = self.client.upgrade() {
777 client
778 .telemetry
779 .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
780 }
781
782 self.organizations = response.organizations.into_iter().map(Arc::new).collect();
783 self.current_organization = self.organizations.first().cloned();
784
785 self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
786 limit: response.plan.usage.edit_predictions.limit,
787 amount: response.plan.usage.edit_predictions.used as i32,
788 }));
789 self.plan_info = Some(response.plan);
790 cx.emit(Event::PrivateUserInfoUpdated);
791 }
792
793 fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
794 cx.spawn(async move |cx| {
795 match message {
796 MessageToClient::UserUpdated => {
797 let cloud_client = cx
798 .update(|cx| {
799 this.read_with(cx, |this, _cx| {
800 this.client.upgrade().map(|client| client.cloud_client())
801 })
802 })?
803 .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
804
805 let response = cloud_client.get_authenticated_user().await?;
806 cx.update(|cx| {
807 this.update(cx, |this, cx| {
808 this.update_authenticated_user(response, cx);
809 })
810 })?;
811 }
812 }
813
814 anyhow::Ok(())
815 })
816 .detach_and_log_err(cx);
817 }
818
819 pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
820 self.current_user.clone()
821 }
822
823 fn load_users(
824 &self,
825 request: impl RequestMessage<Response = UsersResponse>,
826 cx: &Context<Self>,
827 ) -> Task<Result<Vec<Arc<User>>>> {
828 let client = self.client.clone();
829 cx.spawn(async move |this, cx| {
830 if let Some(rpc) = client.upgrade() {
831 let response = rpc.request(request).await.context("error loading users")?;
832 let users = response.users;
833
834 this.update(cx, |this, _| this.insert(users))
835 } else {
836 Ok(Vec::new())
837 }
838 })
839 }
840
841 pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
842 let mut ret = Vec::with_capacity(users.len());
843 for user in users {
844 let user = User::new(user);
845 if let Some(old) = self.users.insert(user.id, user.clone())
846 && old.github_login != user.github_login
847 {
848 self.by_github_login.remove(&old.github_login);
849 }
850 self.by_github_login
851 .insert(user.github_login.clone(), user.id);
852 ret.push(user)
853 }
854 ret
855 }
856
857 pub fn set_participant_indices(
858 &mut self,
859 participant_indices: HashMap<u64, ParticipantIndex>,
860 cx: &mut Context<Self>,
861 ) {
862 if participant_indices != self.participant_indices {
863 self.participant_indices = participant_indices;
864 cx.emit(Event::ParticipantIndicesChanged);
865 }
866 }
867
868 pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
869 &self.participant_indices
870 }
871
872 pub fn participant_names(
873 &self,
874 user_ids: impl Iterator<Item = u64>,
875 cx: &App,
876 ) -> HashMap<u64, SharedString> {
877 let mut ret = HashMap::default();
878 let mut missing_user_ids = Vec::new();
879 for id in user_ids {
880 if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
881 ret.insert(id, github_login);
882 } else {
883 missing_user_ids.push(id)
884 }
885 }
886 if !missing_user_ids.is_empty() {
887 let this = self.weak_self.clone();
888 cx.spawn(async move |cx| {
889 this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
890 .await
891 })
892 .detach_and_log_err(cx);
893 }
894 ret
895 }
896}
897
898impl User {
899 fn new(message: proto::User) -> Arc<Self> {
900 Arc::new(User {
901 id: message.id,
902 github_login: message.github_login.into(),
903 avatar_uri: message.avatar_url.into(),
904 name: message.name,
905 })
906 }
907}
908
909impl Contact {
910 async fn from_proto(
911 contact: proto::Contact,
912 user_store: &Entity<UserStore>,
913 cx: &mut AsyncApp,
914 ) -> Result<Self> {
915 let user = user_store
916 .update(cx, |user_store, cx| {
917 user_store.get_user(contact.user_id, cx)
918 })
919 .await?;
920 Ok(Self {
921 user,
922 online: contact.online,
923 busy: contact.busy,
924 })
925 }
926}
927
928impl Collaborator {
929 pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
930 Ok(Self {
931 peer_id: message.peer_id.context("invalid peer id")?,
932 replica_id: ReplicaId::new(message.replica_id as u16),
933 user_id: message.user_id as UserId,
934 is_host: message.is_host,
935 committer_name: message.committer_name,
936 committer_email: message.committer_email,
937 })
938 }
939}
940
941impl RequestUsage {
942 pub fn over_limit(&self) -> bool {
943 match self.limit {
944 UsageLimit::Limited(limit) => self.amount >= limit,
945 UsageLimit::Unlimited => false,
946 }
947 }
948
949 fn from_headers(
950 limit_name: &str,
951 amount_name: &str,
952 headers: &HeaderMap<HeaderValue>,
953 ) -> Result<Self> {
954 let limit = headers
955 .get(limit_name)
956 .with_context(|| format!("missing {limit_name:?} header"))?;
957 let limit = UsageLimit::from_str(limit.to_str()?)?;
958
959 let amount = headers
960 .get(amount_name)
961 .with_context(|| format!("missing {amount_name:?} header"))?;
962 let amount = amount.to_str()?.parse::<i32>()?;
963
964 Ok(Self { limit, amount })
965 }
966}
967
968impl EditPredictionUsage {
969 pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
970 Ok(Self(RequestUsage::from_headers(
971 EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
972 EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
973 headers,
974 )?))
975 }
976}