user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result, anyhow};
  3use chrono::{DateTime, Utc};
  4use collections::{HashMap, HashSet, hash_map::Entry};
  5use feature_flags::FeatureFlagAppExt;
  6use futures::{Future, StreamExt, channel::mpsc};
  7use gpui::{
  8    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  9};
 10use postage::{sink::Sink, watch};
 11use rpc::proto::{RequestMessage, UsersResponse};
 12use std::sync::{Arc, Weak};
 13use text::ReplicaId;
 14use util::{TryFutureExt as _, maybe};
 15
 16pub type UserId = u64;
 17
 18#[derive(
 19    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 20)]
 21pub struct ChannelId(pub u64);
 22
 23impl std::fmt::Display for ChannelId {
 24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 25        self.0.fmt(f)
 26    }
 27}
 28
 29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 30pub struct ProjectId(pub u64);
 31
 32impl ProjectId {
 33    pub fn to_proto(&self) -> u64 {
 34        self.0
 35    }
 36}
 37
 38#[derive(
 39    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 40)]
 41pub struct DevServerProjectId(pub u64);
 42
 43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 44pub struct ParticipantIndex(pub u32);
 45
 46#[derive(Default, Debug)]
 47pub struct User {
 48    pub id: UserId,
 49    pub github_login: String,
 50    pub avatar_uri: SharedUri,
 51    pub name: Option<String>,
 52}
 53
 54#[derive(Clone, Debug, PartialEq, Eq)]
 55pub struct Collaborator {
 56    pub peer_id: proto::PeerId,
 57    pub replica_id: ReplicaId,
 58    pub user_id: UserId,
 59    pub is_host: bool,
 60    pub committer_name: Option<String>,
 61    pub committer_email: Option<String>,
 62}
 63
 64impl PartialOrd for User {
 65    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 66        Some(self.cmp(other))
 67    }
 68}
 69
 70impl Ord for User {
 71    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 72        self.github_login.cmp(&other.github_login)
 73    }
 74}
 75
 76impl PartialEq for User {
 77    fn eq(&self, other: &Self) -> bool {
 78        self.id == other.id && self.github_login == other.github_login
 79    }
 80}
 81
 82impl Eq for User {}
 83
 84#[derive(Debug, PartialEq)]
 85pub struct Contact {
 86    pub user: Arc<User>,
 87    pub online: bool,
 88    pub busy: bool,
 89}
 90
 91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 92pub enum ContactRequestStatus {
 93    None,
 94    RequestSent,
 95    RequestReceived,
 96    RequestAccepted,
 97}
 98
 99pub struct UserStore {
100    users: HashMap<u64, Arc<User>>,
101    by_github_login: HashMap<String, u64>,
102    participant_indices: HashMap<u64, ParticipantIndex>,
103    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
104    current_plan: Option<proto::Plan>,
105    subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
106    trial_started_at: Option<DateTime<Utc>>,
107    model_request_usage_amount: Option<u32>,
108    model_request_usage_limit: Option<proto::UsageLimit>,
109    edit_predictions_usage_amount: Option<u32>,
110    edit_predictions_usage_limit: Option<proto::UsageLimit>,
111    is_usage_based_billing_enabled: Option<bool>,
112    account_too_young: Option<bool>,
113    has_overdue_invoices: Option<bool>,
114    current_user: watch::Receiver<Option<Arc<User>>>,
115    accepted_tos_at: Option<Option<DateTime<Utc>>>,
116    contacts: Vec<Arc<Contact>>,
117    incoming_contact_requests: Vec<Arc<User>>,
118    outgoing_contact_requests: Vec<Arc<User>>,
119    pending_contact_requests: HashMap<u64, usize>,
120    invite_info: Option<InviteInfo>,
121    client: Weak<Client>,
122    _maintain_contacts: Task<()>,
123    _maintain_current_user: Task<Result<()>>,
124    weak_self: WeakEntity<Self>,
125}
126
127#[derive(Clone)]
128pub struct InviteInfo {
129    pub count: u32,
130    pub url: Arc<str>,
131}
132
133pub enum Event {
134    Contact {
135        user: Arc<User>,
136        kind: ContactEventKind,
137    },
138    ShowContacts,
139    ParticipantIndicesChanged,
140    PrivateUserInfoUpdated,
141}
142
143#[derive(Clone, Copy)]
144pub enum ContactEventKind {
145    Requested,
146    Accepted,
147    Cancelled,
148}
149
150impl EventEmitter<Event> for UserStore {}
151
152enum UpdateContacts {
153    Update(proto::UpdateContacts),
154    Wait(postage::barrier::Sender),
155    Clear(postage::barrier::Sender),
156}
157
158impl UserStore {
159    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
160        let (mut current_user_tx, current_user_rx) = watch::channel();
161        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
162        let rpc_subscriptions = vec![
163            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
164            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
165            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
166            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
167        ];
168        Self {
169            users: Default::default(),
170            by_github_login: Default::default(),
171            current_user: current_user_rx,
172            current_plan: None,
173            subscription_period: None,
174            trial_started_at: None,
175            model_request_usage_amount: None,
176            model_request_usage_limit: None,
177            edit_predictions_usage_amount: None,
178            edit_predictions_usage_limit: None,
179            is_usage_based_billing_enabled: None,
180            account_too_young: None,
181            has_overdue_invoices: None,
182            accepted_tos_at: None,
183            contacts: Default::default(),
184            incoming_contact_requests: Default::default(),
185            participant_indices: Default::default(),
186            outgoing_contact_requests: Default::default(),
187            invite_info: None,
188            client: Arc::downgrade(&client),
189            update_contacts_tx,
190            _maintain_contacts: cx.spawn(async move |this, cx| {
191                let _subscriptions = rpc_subscriptions;
192                while let Some(message) = update_contacts_rx.next().await {
193                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
194                    {
195                        task.log_err().await;
196                    } else {
197                        break;
198                    }
199                }
200            }),
201            _maintain_current_user: cx.spawn(async move |this, cx| {
202                let mut status = client.status();
203                let weak = Arc::downgrade(&client);
204                drop(client);
205                while let Some(status) = status.next().await {
206                    // if the client is dropped, the app is shutting down.
207                    let Some(client) = weak.upgrade() else {
208                        return Ok(());
209                    };
210                    match status {
211                        Status::Connected { .. } => {
212                            if let Some(user_id) = client.user_id() {
213                                let fetch_user = if let Ok(fetch_user) =
214                                    this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
215                                {
216                                    fetch_user
217                                } else {
218                                    break;
219                                };
220                                let fetch_private_user_info =
221                                    client.request(proto::GetPrivateUserInfo {}).log_err();
222                                let (user, info) =
223                                    futures::join!(fetch_user, fetch_private_user_info);
224
225                                cx.update(|cx| {
226                                    if let Some(info) = info {
227                                        let staff =
228                                            info.staff && !*feature_flags::ZED_DISABLE_STAFF;
229                                        cx.update_flags(staff, info.flags);
230                                        client.telemetry.set_authenticated_user_info(
231                                            Some(info.metrics_id.clone()),
232                                            staff,
233                                        );
234
235                                        this.update(cx, |this, cx| {
236                                            let accepted_tos_at = {
237                                                #[cfg(debug_assertions)]
238                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
239                                                {
240                                                    None
241                                                } else {
242                                                    info.accepted_tos_at
243                                                }
244
245                                                #[cfg(not(debug_assertions))]
246                                                info.accepted_tos_at
247                                            };
248
249                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
250                                            cx.emit(Event::PrivateUserInfoUpdated);
251                                        })
252                                    } else {
253                                        anyhow::Ok(())
254                                    }
255                                })??;
256
257                                current_user_tx.send(user).await.ok();
258
259                                this.update(cx, |_, cx| cx.notify())?;
260                            }
261                        }
262                        Status::SignedOut => {
263                            current_user_tx.send(None).await.ok();
264                            this.update(cx, |this, cx| {
265                                this.accepted_tos_at = None;
266                                cx.emit(Event::PrivateUserInfoUpdated);
267                                cx.notify();
268                                this.clear_contacts()
269                            })?
270                            .await;
271                        }
272                        Status::ConnectionLost => {
273                            this.update(cx, |this, cx| {
274                                cx.notify();
275                                this.clear_contacts()
276                            })?
277                            .await;
278                        }
279                        _ => {}
280                    }
281                }
282                Ok(())
283            }),
284            pending_contact_requests: Default::default(),
285            weak_self: cx.weak_entity(),
286        }
287    }
288
289    #[cfg(feature = "test-support")]
290    pub fn clear_cache(&mut self) {
291        self.users.clear();
292        self.by_github_login.clear();
293    }
294
295    async fn handle_update_invite_info(
296        this: Entity<Self>,
297        message: TypedEnvelope<proto::UpdateInviteInfo>,
298        mut cx: AsyncApp,
299    ) -> Result<()> {
300        this.update(&mut cx, |this, cx| {
301            this.invite_info = Some(InviteInfo {
302                url: Arc::from(message.payload.url),
303                count: message.payload.count,
304            });
305            cx.notify();
306        })?;
307        Ok(())
308    }
309
310    async fn handle_show_contacts(
311        this: Entity<Self>,
312        _: TypedEnvelope<proto::ShowContacts>,
313        mut cx: AsyncApp,
314    ) -> Result<()> {
315        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
316        Ok(())
317    }
318
319    pub fn invite_info(&self) -> Option<&InviteInfo> {
320        self.invite_info.as_ref()
321    }
322
323    async fn handle_update_contacts(
324        this: Entity<Self>,
325        message: TypedEnvelope<proto::UpdateContacts>,
326        mut cx: AsyncApp,
327    ) -> Result<()> {
328        this.read_with(&mut cx, |this, _| {
329            this.update_contacts_tx
330                .unbounded_send(UpdateContacts::Update(message.payload))
331                .unwrap();
332        })?;
333        Ok(())
334    }
335
336    async fn handle_update_plan(
337        this: Entity<Self>,
338        message: TypedEnvelope<proto::UpdateUserPlan>,
339        mut cx: AsyncApp,
340    ) -> Result<()> {
341        this.update(&mut cx, |this, cx| {
342            this.current_plan = Some(message.payload.plan());
343            this.subscription_period = maybe!({
344                let period = message.payload.subscription_period?;
345                let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
346                let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
347
348                Some((started_at, ended_at))
349            });
350            this.trial_started_at = message
351                .payload
352                .trial_started_at
353                .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
354            this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
355            this.account_too_young = message.payload.account_too_young;
356            this.has_overdue_invoices = message.payload.has_overdue_invoices;
357
358            if let Some(usage) = message.payload.usage {
359                this.model_request_usage_amount = Some(usage.model_requests_usage_amount);
360                this.model_request_usage_limit = usage.model_requests_usage_limit;
361                this.edit_predictions_usage_amount = Some(usage.edit_predictions_usage_amount);
362                this.edit_predictions_usage_limit = usage.edit_predictions_usage_limit;
363            }
364
365            cx.notify();
366        })?;
367        Ok(())
368    }
369
370    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
371        match message {
372            UpdateContacts::Wait(barrier) => {
373                drop(barrier);
374                Task::ready(Ok(()))
375            }
376            UpdateContacts::Clear(barrier) => {
377                self.contacts.clear();
378                self.incoming_contact_requests.clear();
379                self.outgoing_contact_requests.clear();
380                drop(barrier);
381                Task::ready(Ok(()))
382            }
383            UpdateContacts::Update(message) => {
384                let mut user_ids = HashSet::default();
385                for contact in &message.contacts {
386                    user_ids.insert(contact.user_id);
387                }
388                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
389                user_ids.extend(message.outgoing_requests.iter());
390
391                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
392                cx.spawn(async move |this, cx| {
393                    load_users.await?;
394
395                    // Users are fetched in parallel above and cached in call to get_users
396                    // No need to parallelize here
397                    let mut updated_contacts = Vec::new();
398                    let this = this.upgrade().context("can't upgrade user store handle")?;
399                    for contact in message.contacts {
400                        updated_contacts
401                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
402                    }
403
404                    let mut incoming_requests = Vec::new();
405                    for request in message.incoming_requests {
406                        incoming_requests.push({
407                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
408                                .await?
409                        });
410                    }
411
412                    let mut outgoing_requests = Vec::new();
413                    for requested_user_id in message.outgoing_requests {
414                        outgoing_requests.push(
415                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
416                                .await?,
417                        );
418                    }
419
420                    let removed_contacts =
421                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
422                    let removed_incoming_requests =
423                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
424                    let removed_outgoing_requests =
425                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
426
427                    this.update(cx, |this, cx| {
428                        // Remove contacts
429                        this.contacts
430                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
431                        // Update existing contacts and insert new ones
432                        for updated_contact in updated_contacts {
433                            match this.contacts.binary_search_by_key(
434                                &&updated_contact.user.github_login,
435                                |contact| &contact.user.github_login,
436                            ) {
437                                Ok(ix) => this.contacts[ix] = updated_contact,
438                                Err(ix) => this.contacts.insert(ix, updated_contact),
439                            }
440                        }
441
442                        // Remove incoming contact requests
443                        this.incoming_contact_requests.retain(|user| {
444                            if removed_incoming_requests.contains(&user.id) {
445                                cx.emit(Event::Contact {
446                                    user: user.clone(),
447                                    kind: ContactEventKind::Cancelled,
448                                });
449                                false
450                            } else {
451                                true
452                            }
453                        });
454                        // Update existing incoming requests and insert new ones
455                        for user in incoming_requests {
456                            match this
457                                .incoming_contact_requests
458                                .binary_search_by_key(&&user.github_login, |contact| {
459                                    &contact.github_login
460                                }) {
461                                Ok(ix) => this.incoming_contact_requests[ix] = user,
462                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
463                            }
464                        }
465
466                        // Remove outgoing contact requests
467                        this.outgoing_contact_requests
468                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
469                        // Update existing incoming requests and insert new ones
470                        for request in outgoing_requests {
471                            match this
472                                .outgoing_contact_requests
473                                .binary_search_by_key(&&request.github_login, |contact| {
474                                    &contact.github_login
475                                }) {
476                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
477                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
478                            }
479                        }
480
481                        cx.notify();
482                    })?;
483
484                    Ok(())
485                })
486            }
487        }
488    }
489
490    pub fn contacts(&self) -> &[Arc<Contact>] {
491        &self.contacts
492    }
493
494    pub fn has_contact(&self, user: &Arc<User>) -> bool {
495        self.contacts
496            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
497            .is_ok()
498    }
499
500    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
501        &self.incoming_contact_requests
502    }
503
504    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
505        &self.outgoing_contact_requests
506    }
507
508    pub fn is_contact_request_pending(&self, user: &User) -> bool {
509        self.pending_contact_requests.contains_key(&user.id)
510    }
511
512    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
513        if self
514            .contacts
515            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
516            .is_ok()
517        {
518            ContactRequestStatus::RequestAccepted
519        } else if self
520            .outgoing_contact_requests
521            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
522            .is_ok()
523        {
524            ContactRequestStatus::RequestSent
525        } else if self
526            .incoming_contact_requests
527            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
528            .is_ok()
529        {
530            ContactRequestStatus::RequestReceived
531        } else {
532            ContactRequestStatus::None
533        }
534    }
535
536    pub fn request_contact(
537        &mut self,
538        responder_id: u64,
539        cx: &mut Context<Self>,
540    ) -> Task<Result<()>> {
541        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
542    }
543
544    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
545        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
546    }
547
548    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
549        self.incoming_contact_requests
550            .iter()
551            .any(|user| user.id == user_id)
552    }
553
554    pub fn respond_to_contact_request(
555        &mut self,
556        requester_id: u64,
557        accept: bool,
558        cx: &mut Context<Self>,
559    ) -> Task<Result<()>> {
560        self.perform_contact_request(
561            requester_id,
562            proto::RespondToContactRequest {
563                requester_id,
564                response: if accept {
565                    proto::ContactRequestResponse::Accept
566                } else {
567                    proto::ContactRequestResponse::Decline
568                } as i32,
569            },
570            cx,
571        )
572    }
573
574    pub fn dismiss_contact_request(
575        &self,
576        requester_id: u64,
577        cx: &Context<Self>,
578    ) -> Task<Result<()>> {
579        let client = self.client.upgrade();
580        cx.spawn(async move |_, _| {
581            client
582                .context("can't upgrade client reference")?
583                .request(proto::RespondToContactRequest {
584                    requester_id,
585                    response: proto::ContactRequestResponse::Dismiss as i32,
586                })
587                .await?;
588            Ok(())
589        })
590    }
591
592    fn perform_contact_request<T: RequestMessage>(
593        &mut self,
594        user_id: u64,
595        request: T,
596        cx: &mut Context<Self>,
597    ) -> Task<Result<()>> {
598        let client = self.client.upgrade();
599        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
600        cx.notify();
601
602        cx.spawn(async move |this, cx| {
603            let response = client
604                .context("can't upgrade client reference")?
605                .request(request)
606                .await;
607            this.update(cx, |this, cx| {
608                if let Entry::Occupied(mut request_count) =
609                    this.pending_contact_requests.entry(user_id)
610                {
611                    *request_count.get_mut() -= 1;
612                    if *request_count.get() == 0 {
613                        request_count.remove();
614                    }
615                }
616                cx.notify();
617            })?;
618            response?;
619            Ok(())
620        })
621    }
622
623    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
624        let (tx, mut rx) = postage::barrier::channel();
625        self.update_contacts_tx
626            .unbounded_send(UpdateContacts::Clear(tx))
627            .unwrap();
628        async move {
629            rx.next().await;
630        }
631    }
632
633    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
634        let (tx, mut rx) = postage::barrier::channel();
635        self.update_contacts_tx
636            .unbounded_send(UpdateContacts::Wait(tx))
637            .unwrap();
638        async move {
639            rx.next().await;
640        }
641    }
642
643    pub fn get_users(
644        &self,
645        user_ids: Vec<u64>,
646        cx: &Context<Self>,
647    ) -> Task<Result<Vec<Arc<User>>>> {
648        let mut user_ids_to_fetch = user_ids.clone();
649        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
650
651        cx.spawn(async move |this, cx| {
652            if !user_ids_to_fetch.is_empty() {
653                this.update(cx, |this, cx| {
654                    this.load_users(
655                        proto::GetUsers {
656                            user_ids: user_ids_to_fetch,
657                        },
658                        cx,
659                    )
660                })?
661                .await?;
662            }
663
664            this.read_with(cx, |this, _| {
665                user_ids
666                    .iter()
667                    .map(|user_id| {
668                        this.users
669                            .get(user_id)
670                            .cloned()
671                            .with_context(|| format!("user {user_id} not found"))
672                    })
673                    .collect()
674            })?
675        })
676    }
677
678    pub fn fuzzy_search_users(
679        &self,
680        query: String,
681        cx: &Context<Self>,
682    ) -> Task<Result<Vec<Arc<User>>>> {
683        self.load_users(proto::FuzzySearchUsers { query }, cx)
684    }
685
686    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
687        self.users.get(&user_id).cloned()
688    }
689
690    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
691        if let Some(user) = self.users.get(&user_id).cloned() {
692            return Some(user);
693        }
694
695        self.get_user(user_id, cx).detach_and_log_err(cx);
696        None
697    }
698
699    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
700        if let Some(user) = self.users.get(&user_id).cloned() {
701            return Task::ready(Ok(user));
702        }
703
704        let load_users = self.get_users(vec![user_id], cx);
705        cx.spawn(async move |this, cx| {
706            load_users.await?;
707            this.read_with(cx, |this, _| {
708                this.users
709                    .get(&user_id)
710                    .cloned()
711                    .context("server responded with no users")
712            })?
713        })
714    }
715
716    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
717        self.by_github_login
718            .get(github_login)
719            .and_then(|id| self.users.get(id).cloned())
720    }
721
722    pub fn current_user(&self) -> Option<Arc<User>> {
723        self.current_user.borrow().clone()
724    }
725
726    pub fn current_plan(&self) -> Option<proto::Plan> {
727        self.current_plan
728    }
729
730    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
731        self.subscription_period
732    }
733
734    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
735        self.trial_started_at
736    }
737
738    pub fn usage_based_billing_enabled(&self) -> Option<bool> {
739        self.is_usage_based_billing_enabled
740    }
741
742    pub fn model_request_usage_amount(&self) -> Option<u32> {
743        self.model_request_usage_amount
744    }
745
746    pub fn model_request_usage_limit(&self) -> Option<proto::UsageLimit> {
747        self.model_request_usage_limit.clone()
748    }
749
750    pub fn edit_predictions_usage_amount(&self) -> Option<u32> {
751        self.edit_predictions_usage_amount
752    }
753
754    pub fn edit_predictions_usage_limit(&self) -> Option<proto::UsageLimit> {
755        self.edit_predictions_usage_limit.clone()
756    }
757
758    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
759        self.current_user.clone()
760    }
761
762    /// Returns whether the user's account is too new to use the service.
763    pub fn account_too_young(&self) -> bool {
764        self.account_too_young.unwrap_or(false)
765    }
766
767    /// Returns whether the current user has overdue invoices and usage should be blocked.
768    pub fn has_overdue_invoices(&self) -> bool {
769        self.has_overdue_invoices.unwrap_or(false)
770    }
771
772    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
773        self.accepted_tos_at
774            .map(|accepted_tos_at| accepted_tos_at.is_some())
775    }
776
777    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
778        if self.current_user().is_none() {
779            return Task::ready(Err(anyhow!("no current user")));
780        };
781
782        let client = self.client.clone();
783        cx.spawn(async move |this, cx| -> anyhow::Result<()> {
784            let client = client.upgrade().context("client not found")?;
785            let response = client
786                .request(proto::AcceptTermsOfService {})
787                .await
788                .context("error accepting tos")?;
789            this.update(cx, |this, cx| {
790                this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
791                cx.emit(Event::PrivateUserInfoUpdated);
792            })?;
793            Ok(())
794        })
795    }
796
797    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
798        self.accepted_tos_at = Some(
799            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
800        );
801    }
802
803    fn load_users(
804        &self,
805        request: impl RequestMessage<Response = UsersResponse>,
806        cx: &Context<Self>,
807    ) -> Task<Result<Vec<Arc<User>>>> {
808        let client = self.client.clone();
809        cx.spawn(async move |this, cx| {
810            if let Some(rpc) = client.upgrade() {
811                let response = rpc.request(request).await.context("error loading users")?;
812                let users = response.users;
813
814                this.update(cx, |this, _| this.insert(users))
815            } else {
816                Ok(Vec::new())
817            }
818        })
819    }
820
821    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
822        let mut ret = Vec::with_capacity(users.len());
823        for user in users {
824            let user = User::new(user);
825            if let Some(old) = self.users.insert(user.id, user.clone()) {
826                if old.github_login != user.github_login {
827                    self.by_github_login.remove(&old.github_login);
828                }
829            }
830            self.by_github_login
831                .insert(user.github_login.clone(), user.id);
832            ret.push(user)
833        }
834        ret
835    }
836
837    pub fn set_participant_indices(
838        &mut self,
839        participant_indices: HashMap<u64, ParticipantIndex>,
840        cx: &mut Context<Self>,
841    ) {
842        if participant_indices != self.participant_indices {
843            self.participant_indices = participant_indices;
844            cx.emit(Event::ParticipantIndicesChanged);
845        }
846    }
847
848    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
849        &self.participant_indices
850    }
851
852    pub fn participant_names(
853        &self,
854        user_ids: impl Iterator<Item = u64>,
855        cx: &App,
856    ) -> HashMap<u64, SharedString> {
857        let mut ret = HashMap::default();
858        let mut missing_user_ids = Vec::new();
859        for id in user_ids {
860            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
861                ret.insert(id, github_login.into());
862            } else {
863                missing_user_ids.push(id)
864            }
865        }
866        if !missing_user_ids.is_empty() {
867            let this = self.weak_self.clone();
868            cx.spawn(async move |cx| {
869                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
870                    .await
871            })
872            .detach_and_log_err(cx);
873        }
874        ret
875    }
876}
877
878impl User {
879    fn new(message: proto::User) -> Arc<Self> {
880        Arc::new(User {
881            id: message.id,
882            github_login: message.github_login,
883            avatar_uri: message.avatar_url.into(),
884            name: message.name,
885        })
886    }
887}
888
889impl Contact {
890    async fn from_proto(
891        contact: proto::Contact,
892        user_store: &Entity<UserStore>,
893        cx: &mut AsyncApp,
894    ) -> Result<Self> {
895        let user = user_store
896            .update(cx, |user_store, cx| {
897                user_store.get_user(contact.user_id, cx)
898            })?
899            .await?;
900        Ok(Self {
901            user,
902            online: contact.online,
903            busy: contact.busy,
904        })
905    }
906}
907
908impl Collaborator {
909    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
910        Ok(Self {
911            peer_id: message.peer_id.context("invalid peer id")?,
912            replica_id: message.replica_id as ReplicaId,
913            user_id: message.user_id as UserId,
914            is_host: message.is_host,
915            committer_name: message.committer_name,
916            committer_email: message.committer_email,
917        })
918    }
919}