user.rs

  1use super::{Client, Status, TypedEnvelope, proto};
  2use anyhow::{Context as _, Result, anyhow};
  3use chrono::{DateTime, Utc};
  4use collections::{HashMap, HashSet, hash_map::Entry};
  5use feature_flags::FeatureFlagAppExt;
  6use futures::{Future, StreamExt, channel::mpsc};
  7use gpui::{
  8    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  9};
 10use postage::{sink::Sink, watch};
 11use rpc::proto::{RequestMessage, UsersResponse};
 12use std::sync::{Arc, Weak};
 13use text::ReplicaId;
 14use util::{TryFutureExt as _, maybe};
 15
 16pub type UserId = u64;
 17
 18#[derive(
 19    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 20)]
 21pub struct ChannelId(pub u64);
 22
 23impl std::fmt::Display for ChannelId {
 24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 25        self.0.fmt(f)
 26    }
 27}
 28
 29#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
 30pub struct ProjectId(pub u64);
 31
 32impl ProjectId {
 33    pub fn to_proto(&self) -> u64 {
 34        self.0
 35    }
 36}
 37
 38#[derive(
 39    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
 40)]
 41pub struct DevServerProjectId(pub u64);
 42
 43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 44pub struct ParticipantIndex(pub u32);
 45
 46#[derive(Default, Debug)]
 47pub struct User {
 48    pub id: UserId,
 49    pub github_login: String,
 50    pub avatar_uri: SharedUri,
 51    pub name: Option<String>,
 52    pub email: Option<String>,
 53}
 54
 55#[derive(Clone, Debug, PartialEq, Eq)]
 56pub struct Collaborator {
 57    pub peer_id: proto::PeerId,
 58    pub replica_id: ReplicaId,
 59    pub user_id: UserId,
 60    pub is_host: bool,
 61}
 62
 63impl PartialOrd for User {
 64    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
 65        Some(self.cmp(other))
 66    }
 67}
 68
 69impl Ord for User {
 70    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
 71        self.github_login.cmp(&other.github_login)
 72    }
 73}
 74
 75impl PartialEq for User {
 76    fn eq(&self, other: &Self) -> bool {
 77        self.id == other.id && self.github_login == other.github_login
 78    }
 79}
 80
 81impl Eq for User {}
 82
 83#[derive(Debug, PartialEq)]
 84pub struct Contact {
 85    pub user: Arc<User>,
 86    pub online: bool,
 87    pub busy: bool,
 88}
 89
 90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 91pub enum ContactRequestStatus {
 92    None,
 93    RequestSent,
 94    RequestReceived,
 95    RequestAccepted,
 96}
 97
 98pub struct UserStore {
 99    users: HashMap<u64, Arc<User>>,
100    by_github_login: HashMap<String, u64>,
101    participant_indices: HashMap<u64, ParticipantIndex>,
102    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
103    current_plan: Option<proto::Plan>,
104    subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
105    trial_started_at: Option<DateTime<Utc>>,
106    model_request_usage_amount: Option<u32>,
107    model_request_usage_limit: Option<proto::UsageLimit>,
108    edit_predictions_usage_amount: Option<u32>,
109    edit_predictions_usage_limit: Option<proto::UsageLimit>,
110    is_usage_based_billing_enabled: Option<bool>,
111    current_user: watch::Receiver<Option<Arc<User>>>,
112    accepted_tos_at: Option<Option<DateTime<Utc>>>,
113    contacts: Vec<Arc<Contact>>,
114    incoming_contact_requests: Vec<Arc<User>>,
115    outgoing_contact_requests: Vec<Arc<User>>,
116    pending_contact_requests: HashMap<u64, usize>,
117    invite_info: Option<InviteInfo>,
118    client: Weak<Client>,
119    _maintain_contacts: Task<()>,
120    _maintain_current_user: Task<Result<()>>,
121    weak_self: WeakEntity<Self>,
122}
123
124#[derive(Clone)]
125pub struct InviteInfo {
126    pub count: u32,
127    pub url: Arc<str>,
128}
129
130pub enum Event {
131    Contact {
132        user: Arc<User>,
133        kind: ContactEventKind,
134    },
135    ShowContacts,
136    ParticipantIndicesChanged,
137    PrivateUserInfoUpdated,
138}
139
140#[derive(Clone, Copy)]
141pub enum ContactEventKind {
142    Requested,
143    Accepted,
144    Cancelled,
145}
146
147impl EventEmitter<Event> for UserStore {}
148
149enum UpdateContacts {
150    Update(proto::UpdateContacts),
151    Wait(postage::barrier::Sender),
152    Clear(postage::barrier::Sender),
153}
154
155impl UserStore {
156    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
157        let (mut current_user_tx, current_user_rx) = watch::channel();
158        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
159        let rpc_subscriptions = vec![
160            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
161            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
162            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
163            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
164        ];
165        Self {
166            users: Default::default(),
167            by_github_login: Default::default(),
168            current_user: current_user_rx,
169            current_plan: None,
170            subscription_period: None,
171            trial_started_at: None,
172            model_request_usage_amount: None,
173            model_request_usage_limit: None,
174            edit_predictions_usage_amount: None,
175            edit_predictions_usage_limit: None,
176            is_usage_based_billing_enabled: None,
177            accepted_tos_at: None,
178            contacts: Default::default(),
179            incoming_contact_requests: Default::default(),
180            participant_indices: Default::default(),
181            outgoing_contact_requests: Default::default(),
182            invite_info: None,
183            client: Arc::downgrade(&client),
184            update_contacts_tx,
185            _maintain_contacts: cx.spawn(async move |this, cx| {
186                let _subscriptions = rpc_subscriptions;
187                while let Some(message) = update_contacts_rx.next().await {
188                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
189                    {
190                        task.log_err().await;
191                    } else {
192                        break;
193                    }
194                }
195            }),
196            _maintain_current_user: cx.spawn(async move |this, cx| {
197                let mut status = client.status();
198                let weak = Arc::downgrade(&client);
199                drop(client);
200                while let Some(status) = status.next().await {
201                    // if the client is dropped, the app is shutting down.
202                    let Some(client) = weak.upgrade() else {
203                        return Ok(());
204                    };
205                    match status {
206                        Status::Connected { .. } => {
207                            if let Some(user_id) = client.user_id() {
208                                let fetch_user = if let Ok(fetch_user) =
209                                    this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
210                                {
211                                    fetch_user
212                                } else {
213                                    break;
214                                };
215                                let fetch_private_user_info =
216                                    client.request(proto::GetPrivateUserInfo {}).log_err();
217                                let (user, info) =
218                                    futures::join!(fetch_user, fetch_private_user_info);
219
220                                cx.update(|cx| {
221                                    if let Some(info) = info {
222                                        let staff =
223                                            info.staff && !*feature_flags::ZED_DISABLE_STAFF;
224                                        cx.update_flags(staff, info.flags);
225                                        client.telemetry.set_authenticated_user_info(
226                                            Some(info.metrics_id.clone()),
227                                            staff,
228                                        );
229
230                                        this.update(cx, |this, cx| {
231                                            let accepted_tos_at = {
232                                                #[cfg(debug_assertions)]
233                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
234                                                {
235                                                    None
236                                                } else {
237                                                    info.accepted_tos_at
238                                                }
239
240                                                #[cfg(not(debug_assertions))]
241                                                info.accepted_tos_at
242                                            };
243
244                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
245                                            cx.emit(Event::PrivateUserInfoUpdated);
246                                        })
247                                    } else {
248                                        anyhow::Ok(())
249                                    }
250                                })??;
251
252                                current_user_tx.send(user).await.ok();
253
254                                this.update(cx, |_, cx| cx.notify())?;
255                            }
256                        }
257                        Status::SignedOut => {
258                            current_user_tx.send(None).await.ok();
259                            this.update(cx, |this, cx| {
260                                this.accepted_tos_at = None;
261                                cx.emit(Event::PrivateUserInfoUpdated);
262                                cx.notify();
263                                this.clear_contacts()
264                            })?
265                            .await;
266                        }
267                        Status::ConnectionLost => {
268                            this.update(cx, |this, cx| {
269                                cx.notify();
270                                this.clear_contacts()
271                            })?
272                            .await;
273                        }
274                        _ => {}
275                    }
276                }
277                Ok(())
278            }),
279            pending_contact_requests: Default::default(),
280            weak_self: cx.weak_entity(),
281        }
282    }
283
284    #[cfg(feature = "test-support")]
285    pub fn clear_cache(&mut self) {
286        self.users.clear();
287        self.by_github_login.clear();
288    }
289
290    async fn handle_update_invite_info(
291        this: Entity<Self>,
292        message: TypedEnvelope<proto::UpdateInviteInfo>,
293        mut cx: AsyncApp,
294    ) -> Result<()> {
295        this.update(&mut cx, |this, cx| {
296            this.invite_info = Some(InviteInfo {
297                url: Arc::from(message.payload.url),
298                count: message.payload.count,
299            });
300            cx.notify();
301        })?;
302        Ok(())
303    }
304
305    async fn handle_show_contacts(
306        this: Entity<Self>,
307        _: TypedEnvelope<proto::ShowContacts>,
308        mut cx: AsyncApp,
309    ) -> Result<()> {
310        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
311        Ok(())
312    }
313
314    pub fn invite_info(&self) -> Option<&InviteInfo> {
315        self.invite_info.as_ref()
316    }
317
318    async fn handle_update_contacts(
319        this: Entity<Self>,
320        message: TypedEnvelope<proto::UpdateContacts>,
321        mut cx: AsyncApp,
322    ) -> Result<()> {
323        this.update(&mut cx, |this, _| {
324            this.update_contacts_tx
325                .unbounded_send(UpdateContacts::Update(message.payload))
326                .unwrap();
327        })?;
328        Ok(())
329    }
330
331    async fn handle_update_plan(
332        this: Entity<Self>,
333        message: TypedEnvelope<proto::UpdateUserPlan>,
334        mut cx: AsyncApp,
335    ) -> Result<()> {
336        this.update(&mut cx, |this, cx| {
337            this.current_plan = Some(message.payload.plan());
338            this.subscription_period = maybe!({
339                let period = message.payload.subscription_period?;
340                let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
341                let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
342
343                Some((started_at, ended_at))
344            });
345            this.trial_started_at = message
346                .payload
347                .trial_started_at
348                .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
349            this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
350
351            if let Some(usage) = message.payload.usage {
352                this.model_request_usage_amount = Some(usage.model_requests_usage_amount);
353                this.model_request_usage_limit = usage.model_requests_usage_limit;
354                this.edit_predictions_usage_amount = Some(usage.edit_predictions_usage_amount);
355                this.edit_predictions_usage_limit = usage.edit_predictions_usage_limit;
356            }
357
358            cx.notify();
359        })?;
360        Ok(())
361    }
362
363    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
364        match message {
365            UpdateContacts::Wait(barrier) => {
366                drop(barrier);
367                Task::ready(Ok(()))
368            }
369            UpdateContacts::Clear(barrier) => {
370                self.contacts.clear();
371                self.incoming_contact_requests.clear();
372                self.outgoing_contact_requests.clear();
373                drop(barrier);
374                Task::ready(Ok(()))
375            }
376            UpdateContacts::Update(message) => {
377                let mut user_ids = HashSet::default();
378                for contact in &message.contacts {
379                    user_ids.insert(contact.user_id);
380                }
381                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
382                user_ids.extend(message.outgoing_requests.iter());
383
384                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
385                cx.spawn(async move |this, cx| {
386                    load_users.await?;
387
388                    // Users are fetched in parallel above and cached in call to get_users
389                    // No need to parallelize here
390                    let mut updated_contacts = Vec::new();
391                    let this = this
392                        .upgrade()
393                        .ok_or_else(|| anyhow!("can't upgrade user store handle"))?;
394                    for contact in message.contacts {
395                        updated_contacts
396                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
397                    }
398
399                    let mut incoming_requests = Vec::new();
400                    for request in message.incoming_requests {
401                        incoming_requests.push({
402                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
403                                .await?
404                        });
405                    }
406
407                    let mut outgoing_requests = Vec::new();
408                    for requested_user_id in message.outgoing_requests {
409                        outgoing_requests.push(
410                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
411                                .await?,
412                        );
413                    }
414
415                    let removed_contacts =
416                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
417                    let removed_incoming_requests =
418                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
419                    let removed_outgoing_requests =
420                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
421
422                    this.update(cx, |this, cx| {
423                        // Remove contacts
424                        this.contacts
425                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
426                        // Update existing contacts and insert new ones
427                        for updated_contact in updated_contacts {
428                            match this.contacts.binary_search_by_key(
429                                &&updated_contact.user.github_login,
430                                |contact| &contact.user.github_login,
431                            ) {
432                                Ok(ix) => this.contacts[ix] = updated_contact,
433                                Err(ix) => this.contacts.insert(ix, updated_contact),
434                            }
435                        }
436
437                        // Remove incoming contact requests
438                        this.incoming_contact_requests.retain(|user| {
439                            if removed_incoming_requests.contains(&user.id) {
440                                cx.emit(Event::Contact {
441                                    user: user.clone(),
442                                    kind: ContactEventKind::Cancelled,
443                                });
444                                false
445                            } else {
446                                true
447                            }
448                        });
449                        // Update existing incoming requests and insert new ones
450                        for user in incoming_requests {
451                            match this
452                                .incoming_contact_requests
453                                .binary_search_by_key(&&user.github_login, |contact| {
454                                    &contact.github_login
455                                }) {
456                                Ok(ix) => this.incoming_contact_requests[ix] = user,
457                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
458                            }
459                        }
460
461                        // Remove outgoing contact requests
462                        this.outgoing_contact_requests
463                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
464                        // Update existing incoming requests and insert new ones
465                        for request in outgoing_requests {
466                            match this
467                                .outgoing_contact_requests
468                                .binary_search_by_key(&&request.github_login, |contact| {
469                                    &contact.github_login
470                                }) {
471                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
472                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
473                            }
474                        }
475
476                        cx.notify();
477                    })?;
478
479                    Ok(())
480                })
481            }
482        }
483    }
484
485    pub fn contacts(&self) -> &[Arc<Contact>] {
486        &self.contacts
487    }
488
489    pub fn has_contact(&self, user: &Arc<User>) -> bool {
490        self.contacts
491            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
492            .is_ok()
493    }
494
495    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
496        &self.incoming_contact_requests
497    }
498
499    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
500        &self.outgoing_contact_requests
501    }
502
503    pub fn is_contact_request_pending(&self, user: &User) -> bool {
504        self.pending_contact_requests.contains_key(&user.id)
505    }
506
507    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
508        if self
509            .contacts
510            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
511            .is_ok()
512        {
513            ContactRequestStatus::RequestAccepted
514        } else if self
515            .outgoing_contact_requests
516            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
517            .is_ok()
518        {
519            ContactRequestStatus::RequestSent
520        } else if self
521            .incoming_contact_requests
522            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
523            .is_ok()
524        {
525            ContactRequestStatus::RequestReceived
526        } else {
527            ContactRequestStatus::None
528        }
529    }
530
531    pub fn request_contact(
532        &mut self,
533        responder_id: u64,
534        cx: &mut Context<Self>,
535    ) -> Task<Result<()>> {
536        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
537    }
538
539    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
540        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
541    }
542
543    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
544        self.incoming_contact_requests
545            .iter()
546            .any(|user| user.id == user_id)
547    }
548
549    pub fn respond_to_contact_request(
550        &mut self,
551        requester_id: u64,
552        accept: bool,
553        cx: &mut Context<Self>,
554    ) -> Task<Result<()>> {
555        self.perform_contact_request(
556            requester_id,
557            proto::RespondToContactRequest {
558                requester_id,
559                response: if accept {
560                    proto::ContactRequestResponse::Accept
561                } else {
562                    proto::ContactRequestResponse::Decline
563                } as i32,
564            },
565            cx,
566        )
567    }
568
569    pub fn dismiss_contact_request(
570        &self,
571        requester_id: u64,
572        cx: &Context<Self>,
573    ) -> Task<Result<()>> {
574        let client = self.client.upgrade();
575        cx.spawn(async move |_, _| {
576            client
577                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
578                .request(proto::RespondToContactRequest {
579                    requester_id,
580                    response: proto::ContactRequestResponse::Dismiss as i32,
581                })
582                .await?;
583            Ok(())
584        })
585    }
586
587    fn perform_contact_request<T: RequestMessage>(
588        &mut self,
589        user_id: u64,
590        request: T,
591        cx: &mut Context<Self>,
592    ) -> Task<Result<()>> {
593        let client = self.client.upgrade();
594        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
595        cx.notify();
596
597        cx.spawn(async move |this, cx| {
598            let response = client
599                .ok_or_else(|| anyhow!("can't upgrade client reference"))?
600                .request(request)
601                .await;
602            this.update(cx, |this, cx| {
603                if let Entry::Occupied(mut request_count) =
604                    this.pending_contact_requests.entry(user_id)
605                {
606                    *request_count.get_mut() -= 1;
607                    if *request_count.get() == 0 {
608                        request_count.remove();
609                    }
610                }
611                cx.notify();
612            })?;
613            response?;
614            Ok(())
615        })
616    }
617
618    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
619        let (tx, mut rx) = postage::barrier::channel();
620        self.update_contacts_tx
621            .unbounded_send(UpdateContacts::Clear(tx))
622            .unwrap();
623        async move {
624            rx.next().await;
625        }
626    }
627
628    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
629        let (tx, mut rx) = postage::barrier::channel();
630        self.update_contacts_tx
631            .unbounded_send(UpdateContacts::Wait(tx))
632            .unwrap();
633        async move {
634            rx.next().await;
635        }
636    }
637
638    pub fn get_users(
639        &self,
640        user_ids: Vec<u64>,
641        cx: &Context<Self>,
642    ) -> Task<Result<Vec<Arc<User>>>> {
643        let mut user_ids_to_fetch = user_ids.clone();
644        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
645
646        cx.spawn(async move |this, cx| {
647            if !user_ids_to_fetch.is_empty() {
648                this.update(cx, |this, cx| {
649                    this.load_users(
650                        proto::GetUsers {
651                            user_ids: user_ids_to_fetch,
652                        },
653                        cx,
654                    )
655                })?
656                .await?;
657            }
658
659            this.update(cx, |this, _| {
660                user_ids
661                    .iter()
662                    .map(|user_id| {
663                        this.users
664                            .get(user_id)
665                            .cloned()
666                            .ok_or_else(|| anyhow!("user {} not found", user_id))
667                    })
668                    .collect()
669            })?
670        })
671    }
672
673    pub fn fuzzy_search_users(
674        &self,
675        query: String,
676        cx: &Context<Self>,
677    ) -> Task<Result<Vec<Arc<User>>>> {
678        self.load_users(proto::FuzzySearchUsers { query }, cx)
679    }
680
681    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
682        self.users.get(&user_id).cloned()
683    }
684
685    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
686        if let Some(user) = self.users.get(&user_id).cloned() {
687            return Some(user);
688        }
689
690        self.get_user(user_id, cx).detach_and_log_err(cx);
691        None
692    }
693
694    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
695        if let Some(user) = self.users.get(&user_id).cloned() {
696            return Task::ready(Ok(user));
697        }
698
699        let load_users = self.get_users(vec![user_id], cx);
700        cx.spawn(async move |this, cx| {
701            load_users.await?;
702            this.update(cx, |this, _| {
703                this.users
704                    .get(&user_id)
705                    .cloned()
706                    .ok_or_else(|| anyhow!("server responded with no users"))
707            })?
708        })
709    }
710
711    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
712        self.by_github_login
713            .get(github_login)
714            .and_then(|id| self.users.get(id).cloned())
715    }
716
717    pub fn current_user(&self) -> Option<Arc<User>> {
718        self.current_user.borrow().clone()
719    }
720
721    pub fn current_plan(&self) -> Option<proto::Plan> {
722        self.current_plan
723    }
724
725    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
726        self.subscription_period
727    }
728
729    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
730        self.trial_started_at
731    }
732
733    pub fn usage_based_billing_enabled(&self) -> Option<bool> {
734        self.is_usage_based_billing_enabled
735    }
736
737    pub fn model_request_usage_amount(&self) -> Option<u32> {
738        self.model_request_usage_amount
739    }
740
741    pub fn model_request_usage_limit(&self) -> Option<proto::UsageLimit> {
742        self.model_request_usage_limit.clone()
743    }
744
745    pub fn edit_predictions_usage_amount(&self) -> Option<u32> {
746        self.edit_predictions_usage_amount
747    }
748
749    pub fn edit_predictions_usage_limit(&self) -> Option<proto::UsageLimit> {
750        self.edit_predictions_usage_limit.clone()
751    }
752
753    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
754        self.current_user.clone()
755    }
756
757    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
758        self.accepted_tos_at
759            .map(|accepted_tos_at| accepted_tos_at.is_some())
760    }
761
762    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
763        if self.current_user().is_none() {
764            return Task::ready(Err(anyhow!("no current user")));
765        };
766
767        let client = self.client.clone();
768        cx.spawn(async move |this, cx| {
769            if let Some(client) = client.upgrade() {
770                let response = client
771                    .request(proto::AcceptTermsOfService {})
772                    .await
773                    .context("error accepting tos")?;
774
775                this.update(cx, |this, cx| {
776                    this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
777                    cx.emit(Event::PrivateUserInfoUpdated);
778                })
779            } else {
780                Err(anyhow!("client not found"))
781            }
782        })
783    }
784
785    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
786        self.accepted_tos_at = Some(
787            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
788        );
789    }
790
791    fn load_users(
792        &self,
793        request: impl RequestMessage<Response = UsersResponse>,
794        cx: &Context<Self>,
795    ) -> Task<Result<Vec<Arc<User>>>> {
796        let client = self.client.clone();
797        cx.spawn(async move |this, cx| {
798            if let Some(rpc) = client.upgrade() {
799                let response = rpc.request(request).await.context("error loading users")?;
800                let users = response.users;
801
802                this.update(cx, |this, _| this.insert(users))
803            } else {
804                Ok(Vec::new())
805            }
806        })
807    }
808
809    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
810        let mut ret = Vec::with_capacity(users.len());
811        for user in users {
812            let user = User::new(user);
813            if let Some(old) = self.users.insert(user.id, user.clone()) {
814                if old.github_login != user.github_login {
815                    self.by_github_login.remove(&old.github_login);
816                }
817            }
818            self.by_github_login
819                .insert(user.github_login.clone(), user.id);
820            ret.push(user)
821        }
822        ret
823    }
824
825    pub fn set_participant_indices(
826        &mut self,
827        participant_indices: HashMap<u64, ParticipantIndex>,
828        cx: &mut Context<Self>,
829    ) {
830        if participant_indices != self.participant_indices {
831            self.participant_indices = participant_indices;
832            cx.emit(Event::ParticipantIndicesChanged);
833        }
834    }
835
836    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
837        &self.participant_indices
838    }
839
840    pub fn participant_names(
841        &self,
842        user_ids: impl Iterator<Item = u64>,
843        cx: &App,
844    ) -> HashMap<u64, SharedString> {
845        let mut ret = HashMap::default();
846        let mut missing_user_ids = Vec::new();
847        for id in user_ids {
848            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
849                ret.insert(id, github_login.into());
850            } else {
851                missing_user_ids.push(id)
852            }
853        }
854        if !missing_user_ids.is_empty() {
855            let this = self.weak_self.clone();
856            cx.spawn(async move |cx| {
857                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
858                    .await
859            })
860            .detach_and_log_err(cx);
861        }
862        ret
863    }
864}
865
866impl User {
867    fn new(message: proto::User) -> Arc<Self> {
868        Arc::new(User {
869            id: message.id,
870            github_login: message.github_login,
871            avatar_uri: message.avatar_url.into(),
872            name: message.name,
873            email: message.email,
874        })
875    }
876}
877
878impl Contact {
879    async fn from_proto(
880        contact: proto::Contact,
881        user_store: &Entity<UserStore>,
882        cx: &mut AsyncApp,
883    ) -> Result<Self> {
884        let user = user_store
885            .update(cx, |user_store, cx| {
886                user_store.get_user(contact.user_id, cx)
887            })?
888            .await?;
889        Ok(Self {
890            user,
891            online: contact.online,
892            busy: contact.busy,
893        })
894    }
895}
896
897impl Collaborator {
898    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
899        Ok(Self {
900            peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
901            replica_id: message.replica_id as ReplicaId,
902            user_id: message.user_id as UserId,
903            is_host: message.is_host,
904        })
905    }
906}