user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result, anyhow};
  3use chrono::{DateTime, Utc};
  4use collections::{HashMap, HashSet, hash_map::Entry};
  5use feature_flags::FeatureFlagAppExt;
  6use futures::{Future, StreamExt, channel::mpsc};
  7use gpui::{
  8    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  9};
 10use postage::{sink::Sink, watch};
 11use rpc::proto::{RequestMessage, UsersResponse};
 12use std::sync::{Arc, Weak};
 13use text::ReplicaId;
 14use util::{TryFutureExt as _, maybe};
 15
 16pub type UserId = u64;
 17
 18#[derive(
 19    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 20)]
 21pub struct ChannelId(pub u64);
 22
 23impl std::fmt::Display for ChannelId {
 24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 25        self.0.fmt(f)
 26    }
 27}
 28
 29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 30pub struct ProjectId(pub u64);
 31
 32impl ProjectId {
 33    pub fn to_proto(&self) -> u64 {
 34        self.0
 35    }
 36}
 37
 38#[derive(
 39    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 40)]
 41pub struct DevServerProjectId(pub u64);
 42
 43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 44pub struct ParticipantIndex(pub u32);
 45
 46#[derive(Default, Debug)]
 47pub struct User {
 48    pub id: UserId,
 49    pub github_login: String,
 50    pub avatar_uri: SharedUri,
 51    pub name: Option<String>,
 52    pub email: Option<String>,
 53}
 54
 55#[derive(Clone, Debug, PartialEq, Eq)]
 56pub struct Collaborator {
 57    pub peer_id: proto::PeerId,
 58    pub replica_id: ReplicaId,
 59    pub user_id: UserId,
 60    pub is_host: bool,
 61}
 62
 63impl PartialOrd for User {
 64    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 65        Some(self.cmp(other))
 66    }
 67}
 68
 69impl Ord for User {
 70    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 71        self.github_login.cmp(&other.github_login)
 72    }
 73}
 74
 75impl PartialEq for User {
 76    fn eq(&self, other: &Self) -> bool {
 77        self.id == other.id && self.github_login == other.github_login
 78    }
 79}
 80
 81impl Eq for User {}
 82
 83#[derive(Debug, PartialEq)]
 84pub struct Contact {
 85    pub user: Arc<User>,
 86    pub online: bool,
 87    pub busy: bool,
 88}
 89
 90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 91pub enum ContactRequestStatus {
 92    None,
 93    RequestSent,
 94    RequestReceived,
 95    RequestAccepted,
 96}
 97
 98pub struct UserStore {
 99    users: HashMap<u64, Arc<User>>,
100    by_github_login: HashMap<String, u64>,
101    participant_indices: HashMap<u64, ParticipantIndex>,
102    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
103    current_plan: Option<proto::Plan>,
104    subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
105    trial_started_at: Option<DateTime<Utc>>,
106    model_request_usage_amount: Option<u32>,
107    model_request_usage_limit: Option<proto::UsageLimit>,
108    edit_predictions_usage_amount: Option<u32>,
109    edit_predictions_usage_limit: Option<proto::UsageLimit>,
110    is_usage_based_billing_enabled: Option<bool>,
111    account_too_young: Option<bool>,
112    has_overdue_invoices: Option<bool>,
113    current_user: watch::Receiver<Option<Arc<User>>>,
114    accepted_tos_at: Option<Option<DateTime<Utc>>>,
115    contacts: Vec<Arc<Contact>>,
116    incoming_contact_requests: Vec<Arc<User>>,
117    outgoing_contact_requests: Vec<Arc<User>>,
118    pending_contact_requests: HashMap<u64, usize>,
119    invite_info: Option<InviteInfo>,
120    client: Weak<Client>,
121    _maintain_contacts: Task<()>,
122    _maintain_current_user: Task<Result<()>>,
123    weak_self: WeakEntity<Self>,
124}
125
126#[derive(Clone)]
127pub struct InviteInfo {
128    pub count: u32,
129    pub url: Arc<str>,
130}
131
132pub enum Event {
133    Contact {
134        user: Arc<User>,
135        kind: ContactEventKind,
136    },
137    ShowContacts,
138    ParticipantIndicesChanged,
139    PrivateUserInfoUpdated,
140}
141
142#[derive(Clone, Copy)]
143pub enum ContactEventKind {
144    Requested,
145    Accepted,
146    Cancelled,
147}
148
149impl EventEmitter<Event> for UserStore {}
150
151enum UpdateContacts {
152    Update(proto::UpdateContacts),
153    Wait(postage::barrier::Sender),
154    Clear(postage::barrier::Sender),
155}
156
157impl UserStore {
158    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
159        let (mut current_user_tx, current_user_rx) = watch::channel();
160        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
161        let rpc_subscriptions = vec![
162            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
163            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
164            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
165            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
166        ];
167        Self {
168            users: Default::default(),
169            by_github_login: Default::default(),
170            current_user: current_user_rx,
171            current_plan: None,
172            subscription_period: None,
173            trial_started_at: None,
174            model_request_usage_amount: None,
175            model_request_usage_limit: None,
176            edit_predictions_usage_amount: None,
177            edit_predictions_usage_limit: None,
178            is_usage_based_billing_enabled: None,
179            account_too_young: None,
180            has_overdue_invoices: None,
181            accepted_tos_at: None,
182            contacts: Default::default(),
183            incoming_contact_requests: Default::default(),
184            participant_indices: Default::default(),
185            outgoing_contact_requests: Default::default(),
186            invite_info: None,
187            client: Arc::downgrade(&client),
188            update_contacts_tx,
189            _maintain_contacts: cx.spawn(async move |this, cx| {
190                let _subscriptions = rpc_subscriptions;
191                while let Some(message) = update_contacts_rx.next().await {
192                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
193                    {
194                        task.log_err().await;
195                    } else {
196                        break;
197                    }
198                }
199            }),
200            _maintain_current_user: cx.spawn(async move |this, cx| {
201                let mut status = client.status();
202                let weak = Arc::downgrade(&client);
203                drop(client);
204                while let Some(status) = status.next().await {
205                    // if the client is dropped, the app is shutting down.
206                    let Some(client) = weak.upgrade() else {
207                        return Ok(());
208                    };
209                    match status {
210                        Status::Connected { .. } => {
211                            if let Some(user_id) = client.user_id() {
212                                let fetch_user = if let Ok(fetch_user) =
213                                    this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
214                                {
215                                    fetch_user
216                                } else {
217                                    break;
218                                };
219                                let fetch_private_user_info =
220                                    client.request(proto::GetPrivateUserInfo {}).log_err();
221                                let (user, info) =
222                                    futures::join!(fetch_user, fetch_private_user_info);
223
224                                cx.update(|cx| {
225                                    if let Some(info) = info {
226                                        let staff =
227                                            info.staff && !*feature_flags::ZED_DISABLE_STAFF;
228                                        cx.update_flags(staff, info.flags);
229                                        client.telemetry.set_authenticated_user_info(
230                                            Some(info.metrics_id.clone()),
231                                            staff,
232                                        );
233
234                                        this.update(cx, |this, cx| {
235                                            let accepted_tos_at = {
236                                                #[cfg(debug_assertions)]
237                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
238                                                {
239                                                    None
240                                                } else {
241                                                    info.accepted_tos_at
242                                                }
243
244                                                #[cfg(not(debug_assertions))]
245                                                info.accepted_tos_at
246                                            };
247
248                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
249                                            cx.emit(Event::PrivateUserInfoUpdated);
250                                        })
251                                    } else {
252                                        anyhow::Ok(())
253                                    }
254                                })??;
255
256                                current_user_tx.send(user).await.ok();
257
258                                this.update(cx, |_, cx| cx.notify())?;
259                            }
260                        }
261                        Status::SignedOut => {
262                            current_user_tx.send(None).await.ok();
263                            this.update(cx, |this, cx| {
264                                this.accepted_tos_at = None;
265                                cx.emit(Event::PrivateUserInfoUpdated);
266                                cx.notify();
267                                this.clear_contacts()
268                            })?
269                            .await;
270                        }
271                        Status::ConnectionLost => {
272                            this.update(cx, |this, cx| {
273                                cx.notify();
274                                this.clear_contacts()
275                            })?
276                            .await;
277                        }
278                        _ => {}
279                    }
280                }
281                Ok(())
282            }),
283            pending_contact_requests: Default::default(),
284            weak_self: cx.weak_entity(),
285        }
286    }
287
288    #[cfg(feature = "test-support")]
289    pub fn clear_cache(&mut self) {
290        self.users.clear();
291        self.by_github_login.clear();
292    }
293
294    async fn handle_update_invite_info(
295        this: Entity<Self>,
296        message: TypedEnvelope<proto::UpdateInviteInfo>,
297        mut cx: AsyncApp,
298    ) -> Result<()> {
299        this.update(&mut cx, |this, cx| {
300            this.invite_info = Some(InviteInfo {
301                url: Arc::from(message.payload.url),
302                count: message.payload.count,
303            });
304            cx.notify();
305        })?;
306        Ok(())
307    }
308
309    async fn handle_show_contacts(
310        this: Entity<Self>,
311        _: TypedEnvelope<proto::ShowContacts>,
312        mut cx: AsyncApp,
313    ) -> Result<()> {
314        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
315        Ok(())
316    }
317
318    pub fn invite_info(&self) -> Option<&InviteInfo> {
319        self.invite_info.as_ref()
320    }
321
322    async fn handle_update_contacts(
323        this: Entity<Self>,
324        message: TypedEnvelope<proto::UpdateContacts>,
325        mut cx: AsyncApp,
326    ) -> Result<()> {
327        this.read_with(&mut cx, |this, _| {
328            this.update_contacts_tx
329                .unbounded_send(UpdateContacts::Update(message.payload))
330                .unwrap();
331        })?;
332        Ok(())
333    }
334
335    async fn handle_update_plan(
336        this: Entity<Self>,
337        message: TypedEnvelope<proto::UpdateUserPlan>,
338        mut cx: AsyncApp,
339    ) -> Result<()> {
340        this.update(&mut cx, |this, cx| {
341            this.current_plan = Some(message.payload.plan());
342            this.subscription_period = maybe!({
343                let period = message.payload.subscription_period?;
344                let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
345                let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
346
347                Some((started_at, ended_at))
348            });
349            this.trial_started_at = message
350                .payload
351                .trial_started_at
352                .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
353            this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
354            this.account_too_young = message.payload.account_too_young;
355            this.has_overdue_invoices = message.payload.has_overdue_invoices;
356
357            if let Some(usage) = message.payload.usage {
358                this.model_request_usage_amount = Some(usage.model_requests_usage_amount);
359                this.model_request_usage_limit = usage.model_requests_usage_limit;
360                this.edit_predictions_usage_amount = Some(usage.edit_predictions_usage_amount);
361                this.edit_predictions_usage_limit = usage.edit_predictions_usage_limit;
362            }
363
364            cx.notify();
365        })?;
366        Ok(())
367    }
368
369    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
370        match message {
371            UpdateContacts::Wait(barrier) => {
372                drop(barrier);
373                Task::ready(Ok(()))
374            }
375            UpdateContacts::Clear(barrier) => {
376                self.contacts.clear();
377                self.incoming_contact_requests.clear();
378                self.outgoing_contact_requests.clear();
379                drop(barrier);
380                Task::ready(Ok(()))
381            }
382            UpdateContacts::Update(message) => {
383                let mut user_ids = HashSet::default();
384                for contact in &message.contacts {
385                    user_ids.insert(contact.user_id);
386                }
387                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
388                user_ids.extend(message.outgoing_requests.iter());
389
390                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
391                cx.spawn(async move |this, cx| {
392                    load_users.await?;
393
394                    // Users are fetched in parallel above and cached in call to get_users
395                    // No need to parallelize here
396                    let mut updated_contacts = Vec::new();
397                    let this = this.upgrade().context("can't upgrade user store handle")?;
398                    for contact in message.contacts {
399                        updated_contacts
400                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
401                    }
402
403                    let mut incoming_requests = Vec::new();
404                    for request in message.incoming_requests {
405                        incoming_requests.push({
406                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
407                                .await?
408                        });
409                    }
410
411                    let mut outgoing_requests = Vec::new();
412                    for requested_user_id in message.outgoing_requests {
413                        outgoing_requests.push(
414                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
415                                .await?,
416                        );
417                    }
418
419                    let removed_contacts =
420                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
421                    let removed_incoming_requests =
422                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
423                    let removed_outgoing_requests =
424                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
425
426                    this.update(cx, |this, cx| {
427                        // Remove contacts
428                        this.contacts
429                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
430                        // Update existing contacts and insert new ones
431                        for updated_contact in updated_contacts {
432                            match this.contacts.binary_search_by_key(
433                                &&updated_contact.user.github_login,
434                                |contact| &contact.user.github_login,
435                            ) {
436                                Ok(ix) => this.contacts[ix] = updated_contact,
437                                Err(ix) => this.contacts.insert(ix, updated_contact),
438                            }
439                        }
440
441                        // Remove incoming contact requests
442                        this.incoming_contact_requests.retain(|user| {
443                            if removed_incoming_requests.contains(&user.id) {
444                                cx.emit(Event::Contact {
445                                    user: user.clone(),
446                                    kind: ContactEventKind::Cancelled,
447                                });
448                                false
449                            } else {
450                                true
451                            }
452                        });
453                        // Update existing incoming requests and insert new ones
454                        for user in incoming_requests {
455                            match this
456                                .incoming_contact_requests
457                                .binary_search_by_key(&&user.github_login, |contact| {
458                                    &contact.github_login
459                                }) {
460                                Ok(ix) => this.incoming_contact_requests[ix] = user,
461                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
462                            }
463                        }
464
465                        // Remove outgoing contact requests
466                        this.outgoing_contact_requests
467                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
468                        // Update existing incoming requests and insert new ones
469                        for request in outgoing_requests {
470                            match this
471                                .outgoing_contact_requests
472                                .binary_search_by_key(&&request.github_login, |contact| {
473                                    &contact.github_login
474                                }) {
475                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
476                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
477                            }
478                        }
479
480                        cx.notify();
481                    })?;
482
483                    Ok(())
484                })
485            }
486        }
487    }
488
489    pub fn contacts(&self) -> &[Arc<Contact>] {
490        &self.contacts
491    }
492
493    pub fn has_contact(&self, user: &Arc<User>) -> bool {
494        self.contacts
495            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
496            .is_ok()
497    }
498
499    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
500        &self.incoming_contact_requests
501    }
502
503    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
504        &self.outgoing_contact_requests
505    }
506
507    pub fn is_contact_request_pending(&self, user: &User) -> bool {
508        self.pending_contact_requests.contains_key(&user.id)
509    }
510
511    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
512        if self
513            .contacts
514            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
515            .is_ok()
516        {
517            ContactRequestStatus::RequestAccepted
518        } else if self
519            .outgoing_contact_requests
520            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
521            .is_ok()
522        {
523            ContactRequestStatus::RequestSent
524        } else if self
525            .incoming_contact_requests
526            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
527            .is_ok()
528        {
529            ContactRequestStatus::RequestReceived
530        } else {
531            ContactRequestStatus::None
532        }
533    }
534
535    pub fn request_contact(
536        &mut self,
537        responder_id: u64,
538        cx: &mut Context<Self>,
539    ) -> Task<Result<()>> {
540        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
541    }
542
543    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
544        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
545    }
546
547    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
548        self.incoming_contact_requests
549            .iter()
550            .any(|user| user.id == user_id)
551    }
552
553    pub fn respond_to_contact_request(
554        &mut self,
555        requester_id: u64,
556        accept: bool,
557        cx: &mut Context<Self>,
558    ) -> Task<Result<()>> {
559        self.perform_contact_request(
560            requester_id,
561            proto::RespondToContactRequest {
562                requester_id,
563                response: if accept {
564                    proto::ContactRequestResponse::Accept
565                } else {
566                    proto::ContactRequestResponse::Decline
567                } as i32,
568            },
569            cx,
570        )
571    }
572
573    pub fn dismiss_contact_request(
574        &self,
575        requester_id: u64,
576        cx: &Context<Self>,
577    ) -> Task<Result<()>> {
578        let client = self.client.upgrade();
579        cx.spawn(async move |_, _| {
580            client
581                .context("can't upgrade client reference")?
582                .request(proto::RespondToContactRequest {
583                    requester_id,
584                    response: proto::ContactRequestResponse::Dismiss as i32,
585                })
586                .await?;
587            Ok(())
588        })
589    }
590
591    fn perform_contact_request<T: RequestMessage>(
592        &mut self,
593        user_id: u64,
594        request: T,
595        cx: &mut Context<Self>,
596    ) -> Task<Result<()>> {
597        let client = self.client.upgrade();
598        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
599        cx.notify();
600
601        cx.spawn(async move |this, cx| {
602            let response = client
603                .context("can't upgrade client reference")?
604                .request(request)
605                .await;
606            this.update(cx, |this, cx| {
607                if let Entry::Occupied(mut request_count) =
608                    this.pending_contact_requests.entry(user_id)
609                {
610                    *request_count.get_mut() -= 1;
611                    if *request_count.get() == 0 {
612                        request_count.remove();
613                    }
614                }
615                cx.notify();
616            })?;
617            response?;
618            Ok(())
619        })
620    }
621
622    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
623        let (tx, mut rx) = postage::barrier::channel();
624        self.update_contacts_tx
625            .unbounded_send(UpdateContacts::Clear(tx))
626            .unwrap();
627        async move {
628            rx.next().await;
629        }
630    }
631
632    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
633        let (tx, mut rx) = postage::barrier::channel();
634        self.update_contacts_tx
635            .unbounded_send(UpdateContacts::Wait(tx))
636            .unwrap();
637        async move {
638            rx.next().await;
639        }
640    }
641
642    pub fn get_users(
643        &self,
644        user_ids: Vec<u64>,
645        cx: &Context<Self>,
646    ) -> Task<Result<Vec<Arc<User>>>> {
647        let mut user_ids_to_fetch = user_ids.clone();
648        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
649
650        cx.spawn(async move |this, cx| {
651            if !user_ids_to_fetch.is_empty() {
652                this.update(cx, |this, cx| {
653                    this.load_users(
654                        proto::GetUsers {
655                            user_ids: user_ids_to_fetch,
656                        },
657                        cx,
658                    )
659                })?
660                .await?;
661            }
662
663            this.read_with(cx, |this, _| {
664                user_ids
665                    .iter()
666                    .map(|user_id| {
667                        this.users
668                            .get(user_id)
669                            .cloned()
670                            .with_context(|| format!("user {user_id} not found"))
671                    })
672                    .collect()
673            })?
674        })
675    }
676
677    pub fn fuzzy_search_users(
678        &self,
679        query: String,
680        cx: &Context<Self>,
681    ) -> Task<Result<Vec<Arc<User>>>> {
682        self.load_users(proto::FuzzySearchUsers { query }, cx)
683    }
684
685    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
686        self.users.get(&user_id).cloned()
687    }
688
689    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
690        if let Some(user) = self.users.get(&user_id).cloned() {
691            return Some(user);
692        }
693
694        self.get_user(user_id, cx).detach_and_log_err(cx);
695        None
696    }
697
698    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
699        if let Some(user) = self.users.get(&user_id).cloned() {
700            return Task::ready(Ok(user));
701        }
702
703        let load_users = self.get_users(vec![user_id], cx);
704        cx.spawn(async move |this, cx| {
705            load_users.await?;
706            this.read_with(cx, |this, _| {
707                this.users
708                    .get(&user_id)
709                    .cloned()
710                    .context("server responded with no users")
711            })?
712        })
713    }
714
715    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
716        self.by_github_login
717            .get(github_login)
718            .and_then(|id| self.users.get(id).cloned())
719    }
720
721    pub fn current_user(&self) -> Option<Arc<User>> {
722        self.current_user.borrow().clone()
723    }
724
725    pub fn current_plan(&self) -> Option<proto::Plan> {
726        self.current_plan
727    }
728
729    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
730        self.subscription_period
731    }
732
733    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
734        self.trial_started_at
735    }
736
737    pub fn usage_based_billing_enabled(&self) -> Option<bool> {
738        self.is_usage_based_billing_enabled
739    }
740
741    pub fn model_request_usage_amount(&self) -> Option<u32> {
742        self.model_request_usage_amount
743    }
744
745    pub fn model_request_usage_limit(&self) -> Option<proto::UsageLimit> {
746        self.model_request_usage_limit.clone()
747    }
748
749    pub fn edit_predictions_usage_amount(&self) -> Option<u32> {
750        self.edit_predictions_usage_amount
751    }
752
753    pub fn edit_predictions_usage_limit(&self) -> Option<proto::UsageLimit> {
754        self.edit_predictions_usage_limit.clone()
755    }
756
757    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
758        self.current_user.clone()
759    }
760
761    /// Returns whether the user's account is too new to use the service.
762    pub fn account_too_young(&self) -> bool {
763        self.account_too_young.unwrap_or(false)
764    }
765
766    /// Returns whether the current user has overdue invoices and usage should be blocked.
767    pub fn has_overdue_invoices(&self) -> bool {
768        self.has_overdue_invoices.unwrap_or(false)
769    }
770
771    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
772        self.accepted_tos_at
773            .map(|accepted_tos_at| accepted_tos_at.is_some())
774    }
775
776    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
777        if self.current_user().is_none() {
778            return Task::ready(Err(anyhow!("no current user")));
779        };
780
781        let client = self.client.clone();
782        cx.spawn(async move |this, cx| -> anyhow::Result<()> {
783            let client = client.upgrade().context("client not found")?;
784            let response = client
785                .request(proto::AcceptTermsOfService {})
786                .await
787                .context("error accepting tos")?;
788            this.update(cx, |this, cx| {
789                this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
790                cx.emit(Event::PrivateUserInfoUpdated);
791            })?;
792            Ok(())
793        })
794    }
795
796    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
797        self.accepted_tos_at = Some(
798            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
799        );
800    }
801
802    fn load_users(
803        &self,
804        request: impl RequestMessage<Response = UsersResponse>,
805        cx: &Context<Self>,
806    ) -> Task<Result<Vec<Arc<User>>>> {
807        let client = self.client.clone();
808        cx.spawn(async move |this, cx| {
809            if let Some(rpc) = client.upgrade() {
810                let response = rpc.request(request).await.context("error loading users")?;
811                let users = response.users;
812
813                this.update(cx, |this, _| this.insert(users))
814            } else {
815                Ok(Vec::new())
816            }
817        })
818    }
819
820    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
821        let mut ret = Vec::with_capacity(users.len());
822        for user in users {
823            let user = User::new(user);
824            if let Some(old) = self.users.insert(user.id, user.clone()) {
825                if old.github_login != user.github_login {
826                    self.by_github_login.remove(&old.github_login);
827                }
828            }
829            self.by_github_login
830                .insert(user.github_login.clone(), user.id);
831            ret.push(user)
832        }
833        ret
834    }
835
836    pub fn set_participant_indices(
837        &mut self,
838        participant_indices: HashMap<u64, ParticipantIndex>,
839        cx: &mut Context<Self>,
840    ) {
841        if participant_indices != self.participant_indices {
842            self.participant_indices = participant_indices;
843            cx.emit(Event::ParticipantIndicesChanged);
844        }
845    }
846
847    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
848        &self.participant_indices
849    }
850
851    pub fn participant_names(
852        &self,
853        user_ids: impl Iterator<Item = u64>,
854        cx: &App,
855    ) -> HashMap<u64, SharedString> {
856        let mut ret = HashMap::default();
857        let mut missing_user_ids = Vec::new();
858        for id in user_ids {
859            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
860                ret.insert(id, github_login.into());
861            } else {
862                missing_user_ids.push(id)
863            }
864        }
865        if !missing_user_ids.is_empty() {
866            let this = self.weak_self.clone();
867            cx.spawn(async move |cx| {
868                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
869                    .await
870            })
871            .detach_and_log_err(cx);
872        }
873        ret
874    }
875}
876
877impl User {
878    fn new(message: proto::User) -> Arc<Self> {
879        Arc::new(User {
880            id: message.id,
881            github_login: message.github_login,
882            avatar_uri: message.avatar_url.into(),
883            name: message.name,
884            email: message.email,
885        })
886    }
887}
888
889impl Contact {
890    async fn from_proto(
891        contact: proto::Contact,
892        user_store: &Entity<UserStore>,
893        cx: &mut AsyncApp,
894    ) -> Result<Self> {
895        let user = user_store
896            .update(cx, |user_store, cx| {
897                user_store.get_user(contact.user_id, cx)
898            })?
899            .await?;
900        Ok(Self {
901            user,
902            online: contact.online,
903            busy: contact.busy,
904        })
905    }
906}
907
908impl Collaborator {
909    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
910        Ok(Self {
911            peer_id: message.peer_id.context("invalid peer id")?,
912            replica_id: message.replica_id as ReplicaId,
913            user_id: message.user_id as UserId,
914            is_host: message.is_host,
915        })
916    }
917}