user.rs

   1use super::{Client, Status, TypedEnvelope, proto};
   2use anyhow::{Context as _, Result, anyhow};
   3use chrono::{DateTime, Utc};
   4use cloud_llm_client::{
   5    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
   6    MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
   7};
   8use collections::{HashMap, HashSet, hash_map::Entry};
   9use derive_more::Deref;
  10use feature_flags::FeatureFlagAppExt;
  11use futures::{Future, StreamExt, channel::mpsc};
  12use gpui::{
  13    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  14};
  15use http_client::http::{HeaderMap, HeaderValue};
  16use postage::{sink::Sink, watch};
  17use rpc::proto::{RequestMessage, UsersResponse};
  18use std::{
  19    str::FromStr as _,
  20    sync::{Arc, Weak},
  21};
  22use text::ReplicaId;
  23use util::{TryFutureExt as _, maybe};
  24
  25pub type UserId = u64;
  26
  27#[derive(
  28    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  29)]
  30pub struct ChannelId(pub u64);
  31
  32impl std::fmt::Display for ChannelId {
  33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  34        self.0.fmt(f)
  35    }
  36}
  37
  38#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
  39pub struct ProjectId(pub u64);
  40
  41impl ProjectId {
  42    pub fn to_proto(&self) -> u64 {
  43        self.0
  44    }
  45}
  46
  47#[derive(
  48    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  49)]
  50pub struct DevServerProjectId(pub u64);
  51
  52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  53pub struct ParticipantIndex(pub u32);
  54
  55#[derive(Default, Debug)]
  56pub struct User {
  57    pub id: UserId,
  58    pub github_login: String,
  59    pub avatar_uri: SharedUri,
  60    pub name: Option<String>,
  61}
  62
  63#[derive(Clone, Debug, PartialEq, Eq)]
  64pub struct Collaborator {
  65    pub peer_id: proto::PeerId,
  66    pub replica_id: ReplicaId,
  67    pub user_id: UserId,
  68    pub is_host: bool,
  69    pub committer_name: Option<String>,
  70    pub committer_email: Option<String>,
  71}
  72
  73impl PartialOrd for User {
  74    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
  75        Some(self.cmp(other))
  76    }
  77}
  78
  79impl Ord for User {
  80    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
  81        self.github_login.cmp(&other.github_login)
  82    }
  83}
  84
  85impl PartialEq for User {
  86    fn eq(&self, other: &Self) -> bool {
  87        self.id == other.id && self.github_login == other.github_login
  88    }
  89}
  90
  91impl Eq for User {}
  92
  93#[derive(Debug, PartialEq)]
  94pub struct Contact {
  95    pub user: Arc<User>,
  96    pub online: bool,
  97    pub busy: bool,
  98}
  99
 100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 101pub enum ContactRequestStatus {
 102    None,
 103    RequestSent,
 104    RequestReceived,
 105    RequestAccepted,
 106}
 107
 108pub struct UserStore {
 109    users: HashMap<u64, Arc<User>>,
 110    by_github_login: HashMap<String, u64>,
 111    participant_indices: HashMap<u64, ParticipantIndex>,
 112    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 113    current_plan: Option<proto::Plan>,
 114    subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
 115    trial_started_at: Option<DateTime<Utc>>,
 116    model_request_usage: Option<ModelRequestUsage>,
 117    edit_prediction_usage: Option<EditPredictionUsage>,
 118    is_usage_based_billing_enabled: Option<bool>,
 119    account_too_young: Option<bool>,
 120    has_overdue_invoices: Option<bool>,
 121    current_user: watch::Receiver<Option<Arc<User>>>,
 122    accepted_tos_at: Option<Option<DateTime<Utc>>>,
 123    contacts: Vec<Arc<Contact>>,
 124    incoming_contact_requests: Vec<Arc<User>>,
 125    outgoing_contact_requests: Vec<Arc<User>>,
 126    pending_contact_requests: HashMap<u64, usize>,
 127    invite_info: Option<InviteInfo>,
 128    client: Weak<Client>,
 129    _maintain_contacts: Task<()>,
 130    _maintain_current_user: Task<Result<()>>,
 131    weak_self: WeakEntity<Self>,
 132}
 133
 134#[derive(Clone)]
 135pub struct InviteInfo {
 136    pub count: u32,
 137    pub url: Arc<str>,
 138}
 139
 140pub enum Event {
 141    Contact {
 142        user: Arc<User>,
 143        kind: ContactEventKind,
 144    },
 145    ShowContacts,
 146    ParticipantIndicesChanged,
 147    PrivateUserInfoUpdated,
 148    PlanUpdated,
 149}
 150
 151#[derive(Clone, Copy)]
 152pub enum ContactEventKind {
 153    Requested,
 154    Accepted,
 155    Cancelled,
 156}
 157
 158impl EventEmitter<Event> for UserStore {}
 159
 160enum UpdateContacts {
 161    Update(proto::UpdateContacts),
 162    Wait(postage::barrier::Sender),
 163    Clear(postage::barrier::Sender),
 164}
 165
 166#[derive(Debug, Clone, Copy, Deref)]
 167pub struct ModelRequestUsage(pub RequestUsage);
 168
 169#[derive(Debug, Clone, Copy, Deref)]
 170pub struct EditPredictionUsage(pub RequestUsage);
 171
 172#[derive(Debug, Clone, Copy)]
 173pub struct RequestUsage {
 174    pub limit: UsageLimit,
 175    pub amount: i32,
 176}
 177
 178impl UserStore {
 179    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
 180        let (mut current_user_tx, current_user_rx) = watch::channel();
 181        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 182        let rpc_subscriptions = vec![
 183            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
 184            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
 185            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
 186            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
 187        ];
 188        Self {
 189            users: Default::default(),
 190            by_github_login: Default::default(),
 191            current_user: current_user_rx,
 192            current_plan: None,
 193            subscription_period: None,
 194            trial_started_at: None,
 195            model_request_usage: None,
 196            edit_prediction_usage: None,
 197            is_usage_based_billing_enabled: None,
 198            account_too_young: None,
 199            has_overdue_invoices: None,
 200            accepted_tos_at: None,
 201            contacts: Default::default(),
 202            incoming_contact_requests: Default::default(),
 203            participant_indices: Default::default(),
 204            outgoing_contact_requests: Default::default(),
 205            invite_info: None,
 206            client: Arc::downgrade(&client),
 207            update_contacts_tx,
 208            _maintain_contacts: cx.spawn(async move |this, cx| {
 209                let _subscriptions = rpc_subscriptions;
 210                while let Some(message) = update_contacts_rx.next().await {
 211                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
 212                    {
 213                        task.log_err().await;
 214                    } else {
 215                        break;
 216                    }
 217                }
 218            }),
 219            _maintain_current_user: cx.spawn(async move |this, cx| {
 220                let mut status = client.status();
 221                let weak = Arc::downgrade(&client);
 222                drop(client);
 223                while let Some(status) = status.next().await {
 224                    // if the client is dropped, the app is shutting down.
 225                    let Some(client) = weak.upgrade() else {
 226                        return Ok(());
 227                    };
 228                    match status {
 229                        Status::Connected { .. } => {
 230                            if let Some(user_id) = client.user_id() {
 231                                let fetch_user = if let Ok(fetch_user) =
 232                                    this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
 233                                {
 234                                    fetch_user
 235                                } else {
 236                                    break;
 237                                };
 238                                let fetch_private_user_info =
 239                                    client.request(proto::GetPrivateUserInfo {}).log_err();
 240                                let (user, info) =
 241                                    futures::join!(fetch_user, fetch_private_user_info);
 242
 243                                cx.update(|cx| {
 244                                    if let Some(info) = info {
 245                                        let staff =
 246                                            info.staff && !*feature_flags::ZED_DISABLE_STAFF;
 247                                        cx.update_flags(staff, info.flags);
 248                                        client.telemetry.set_authenticated_user_info(
 249                                            Some(info.metrics_id.clone()),
 250                                            staff,
 251                                        );
 252
 253                                        this.update(cx, |this, cx| {
 254                                            let accepted_tos_at = {
 255                                                #[cfg(debug_assertions)]
 256                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
 257                                                {
 258                                                    None
 259                                                } else {
 260                                                    info.accepted_tos_at
 261                                                }
 262
 263                                                #[cfg(not(debug_assertions))]
 264                                                info.accepted_tos_at
 265                                            };
 266
 267                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
 268                                            cx.emit(Event::PrivateUserInfoUpdated);
 269                                        })
 270                                    } else {
 271                                        anyhow::Ok(())
 272                                    }
 273                                })??;
 274
 275                                current_user_tx.send(user).await.ok();
 276
 277                                this.update(cx, |_, cx| cx.notify())?;
 278                            }
 279                        }
 280                        Status::SignedOut => {
 281                            current_user_tx.send(None).await.ok();
 282                            this.update(cx, |this, cx| {
 283                                this.accepted_tos_at = None;
 284                                cx.emit(Event::PrivateUserInfoUpdated);
 285                                cx.notify();
 286                                this.clear_contacts()
 287                            })?
 288                            .await;
 289                        }
 290                        Status::ConnectionLost => {
 291                            this.update(cx, |this, cx| {
 292                                cx.notify();
 293                                this.clear_contacts()
 294                            })?
 295                            .await;
 296                        }
 297                        _ => {}
 298                    }
 299                }
 300                Ok(())
 301            }),
 302            pending_contact_requests: Default::default(),
 303            weak_self: cx.weak_entity(),
 304        }
 305    }
 306
 307    #[cfg(feature = "test-support")]
 308    pub fn clear_cache(&mut self) {
 309        self.users.clear();
 310        self.by_github_login.clear();
 311    }
 312
 313    async fn handle_update_invite_info(
 314        this: Entity<Self>,
 315        message: TypedEnvelope<proto::UpdateInviteInfo>,
 316        mut cx: AsyncApp,
 317    ) -> Result<()> {
 318        this.update(&mut cx, |this, cx| {
 319            this.invite_info = Some(InviteInfo {
 320                url: Arc::from(message.payload.url),
 321                count: message.payload.count,
 322            });
 323            cx.notify();
 324        })?;
 325        Ok(())
 326    }
 327
 328    async fn handle_show_contacts(
 329        this: Entity<Self>,
 330        _: TypedEnvelope<proto::ShowContacts>,
 331        mut cx: AsyncApp,
 332    ) -> Result<()> {
 333        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
 334        Ok(())
 335    }
 336
 337    pub fn invite_info(&self) -> Option<&InviteInfo> {
 338        self.invite_info.as_ref()
 339    }
 340
 341    async fn handle_update_contacts(
 342        this: Entity<Self>,
 343        message: TypedEnvelope<proto::UpdateContacts>,
 344        mut cx: AsyncApp,
 345    ) -> Result<()> {
 346        this.read_with(&mut cx, |this, _| {
 347            this.update_contacts_tx
 348                .unbounded_send(UpdateContacts::Update(message.payload))
 349                .unwrap();
 350        })?;
 351        Ok(())
 352    }
 353
 354    async fn handle_update_plan(
 355        this: Entity<Self>,
 356        message: TypedEnvelope<proto::UpdateUserPlan>,
 357        mut cx: AsyncApp,
 358    ) -> Result<()> {
 359        this.update(&mut cx, |this, cx| {
 360            this.current_plan = Some(message.payload.plan());
 361            this.subscription_period = maybe!({
 362                let period = message.payload.subscription_period?;
 363                let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
 364                let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
 365
 366                Some((started_at, ended_at))
 367            });
 368            this.trial_started_at = message
 369                .payload
 370                .trial_started_at
 371                .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
 372            this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
 373            this.account_too_young = message.payload.account_too_young;
 374            this.has_overdue_invoices = message.payload.has_overdue_invoices;
 375
 376            if let Some(usage) = message.payload.usage {
 377                // limits are always present even though they are wrapped in Option
 378                this.model_request_usage = usage
 379                    .model_requests_usage_limit
 380                    .and_then(|limit| {
 381                        RequestUsage::from_proto(usage.model_requests_usage_amount, limit)
 382                    })
 383                    .map(ModelRequestUsage);
 384                this.edit_prediction_usage = usage
 385                    .edit_predictions_usage_limit
 386                    .and_then(|limit| {
 387                        RequestUsage::from_proto(usage.model_requests_usage_amount, limit)
 388                    })
 389                    .map(EditPredictionUsage);
 390            }
 391
 392            cx.emit(Event::PlanUpdated);
 393            cx.notify();
 394        })?;
 395        Ok(())
 396    }
 397
 398    pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
 399        self.model_request_usage = Some(usage);
 400        cx.notify();
 401    }
 402
 403    pub fn update_edit_prediction_usage(
 404        &mut self,
 405        usage: EditPredictionUsage,
 406        cx: &mut Context<Self>,
 407    ) {
 408        self.edit_prediction_usage = Some(usage);
 409        cx.notify();
 410    }
 411
 412    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
 413        match message {
 414            UpdateContacts::Wait(barrier) => {
 415                drop(barrier);
 416                Task::ready(Ok(()))
 417            }
 418            UpdateContacts::Clear(barrier) => {
 419                self.contacts.clear();
 420                self.incoming_contact_requests.clear();
 421                self.outgoing_contact_requests.clear();
 422                drop(barrier);
 423                Task::ready(Ok(()))
 424            }
 425            UpdateContacts::Update(message) => {
 426                let mut user_ids = HashSet::default();
 427                for contact in &message.contacts {
 428                    user_ids.insert(contact.user_id);
 429                }
 430                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
 431                user_ids.extend(message.outgoing_requests.iter());
 432
 433                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
 434                cx.spawn(async move |this, cx| {
 435                    load_users.await?;
 436
 437                    // Users are fetched in parallel above and cached in call to get_users
 438                    // No need to parallelize here
 439                    let mut updated_contacts = Vec::new();
 440                    let this = this.upgrade().context("can't upgrade user store handle")?;
 441                    for contact in message.contacts {
 442                        updated_contacts
 443                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
 444                    }
 445
 446                    let mut incoming_requests = Vec::new();
 447                    for request in message.incoming_requests {
 448                        incoming_requests.push({
 449                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
 450                                .await?
 451                        });
 452                    }
 453
 454                    let mut outgoing_requests = Vec::new();
 455                    for requested_user_id in message.outgoing_requests {
 456                        outgoing_requests.push(
 457                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
 458                                .await?,
 459                        );
 460                    }
 461
 462                    let removed_contacts =
 463                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
 464                    let removed_incoming_requests =
 465                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
 466                    let removed_outgoing_requests =
 467                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
 468
 469                    this.update(cx, |this, cx| {
 470                        // Remove contacts
 471                        this.contacts
 472                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
 473                        // Update existing contacts and insert new ones
 474                        for updated_contact in updated_contacts {
 475                            match this.contacts.binary_search_by_key(
 476                                &&updated_contact.user.github_login,
 477                                |contact| &contact.user.github_login,
 478                            ) {
 479                                Ok(ix) => this.contacts[ix] = updated_contact,
 480                                Err(ix) => this.contacts.insert(ix, updated_contact),
 481                            }
 482                        }
 483
 484                        // Remove incoming contact requests
 485                        this.incoming_contact_requests.retain(|user| {
 486                            if removed_incoming_requests.contains(&user.id) {
 487                                cx.emit(Event::Contact {
 488                                    user: user.clone(),
 489                                    kind: ContactEventKind::Cancelled,
 490                                });
 491                                false
 492                            } else {
 493                                true
 494                            }
 495                        });
 496                        // Update existing incoming requests and insert new ones
 497                        for user in incoming_requests {
 498                            match this
 499                                .incoming_contact_requests
 500                                .binary_search_by_key(&&user.github_login, |contact| {
 501                                    &contact.github_login
 502                                }) {
 503                                Ok(ix) => this.incoming_contact_requests[ix] = user,
 504                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
 505                            }
 506                        }
 507
 508                        // Remove outgoing contact requests
 509                        this.outgoing_contact_requests
 510                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
 511                        // Update existing incoming requests and insert new ones
 512                        for request in outgoing_requests {
 513                            match this
 514                                .outgoing_contact_requests
 515                                .binary_search_by_key(&&request.github_login, |contact| {
 516                                    &contact.github_login
 517                                }) {
 518                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
 519                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
 520                            }
 521                        }
 522
 523                        cx.notify();
 524                    })?;
 525
 526                    Ok(())
 527                })
 528            }
 529        }
 530    }
 531
 532    pub fn contacts(&self) -> &[Arc<Contact>] {
 533        &self.contacts
 534    }
 535
 536    pub fn has_contact(&self, user: &Arc<User>) -> bool {
 537        self.contacts
 538            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 539            .is_ok()
 540    }
 541
 542    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
 543        &self.incoming_contact_requests
 544    }
 545
 546    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
 547        &self.outgoing_contact_requests
 548    }
 549
 550    pub fn is_contact_request_pending(&self, user: &User) -> bool {
 551        self.pending_contact_requests.contains_key(&user.id)
 552    }
 553
 554    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
 555        if self
 556            .contacts
 557            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 558            .is_ok()
 559        {
 560            ContactRequestStatus::RequestAccepted
 561        } else if self
 562            .outgoing_contact_requests
 563            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 564            .is_ok()
 565        {
 566            ContactRequestStatus::RequestSent
 567        } else if self
 568            .incoming_contact_requests
 569            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 570            .is_ok()
 571        {
 572            ContactRequestStatus::RequestReceived
 573        } else {
 574            ContactRequestStatus::None
 575        }
 576    }
 577
 578    pub fn request_contact(
 579        &mut self,
 580        responder_id: u64,
 581        cx: &mut Context<Self>,
 582    ) -> Task<Result<()>> {
 583        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
 584    }
 585
 586    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
 587        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
 588    }
 589
 590    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
 591        self.incoming_contact_requests
 592            .iter()
 593            .any(|user| user.id == user_id)
 594    }
 595
 596    pub fn respond_to_contact_request(
 597        &mut self,
 598        requester_id: u64,
 599        accept: bool,
 600        cx: &mut Context<Self>,
 601    ) -> Task<Result<()>> {
 602        self.perform_contact_request(
 603            requester_id,
 604            proto::RespondToContactRequest {
 605                requester_id,
 606                response: if accept {
 607                    proto::ContactRequestResponse::Accept
 608                } else {
 609                    proto::ContactRequestResponse::Decline
 610                } as i32,
 611            },
 612            cx,
 613        )
 614    }
 615
 616    pub fn dismiss_contact_request(
 617        &self,
 618        requester_id: u64,
 619        cx: &Context<Self>,
 620    ) -> Task<Result<()>> {
 621        let client = self.client.upgrade();
 622        cx.spawn(async move |_, _| {
 623            client
 624                .context("can't upgrade client reference")?
 625                .request(proto::RespondToContactRequest {
 626                    requester_id,
 627                    response: proto::ContactRequestResponse::Dismiss as i32,
 628                })
 629                .await?;
 630            Ok(())
 631        })
 632    }
 633
 634    fn perform_contact_request<T: RequestMessage>(
 635        &mut self,
 636        user_id: u64,
 637        request: T,
 638        cx: &mut Context<Self>,
 639    ) -> Task<Result<()>> {
 640        let client = self.client.upgrade();
 641        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
 642        cx.notify();
 643
 644        cx.spawn(async move |this, cx| {
 645            let response = client
 646                .context("can't upgrade client reference")?
 647                .request(request)
 648                .await;
 649            this.update(cx, |this, cx| {
 650                if let Entry::Occupied(mut request_count) =
 651                    this.pending_contact_requests.entry(user_id)
 652                {
 653                    *request_count.get_mut() -= 1;
 654                    if *request_count.get() == 0 {
 655                        request_count.remove();
 656                    }
 657                }
 658                cx.notify();
 659            })?;
 660            response?;
 661            Ok(())
 662        })
 663    }
 664
 665    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
 666        let (tx, mut rx) = postage::barrier::channel();
 667        self.update_contacts_tx
 668            .unbounded_send(UpdateContacts::Clear(tx))
 669            .unwrap();
 670        async move {
 671            rx.next().await;
 672        }
 673    }
 674
 675    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
 676        let (tx, mut rx) = postage::barrier::channel();
 677        self.update_contacts_tx
 678            .unbounded_send(UpdateContacts::Wait(tx))
 679            .unwrap();
 680        async move {
 681            rx.next().await;
 682        }
 683    }
 684
 685    pub fn get_users(
 686        &self,
 687        user_ids: Vec<u64>,
 688        cx: &Context<Self>,
 689    ) -> Task<Result<Vec<Arc<User>>>> {
 690        let mut user_ids_to_fetch = user_ids.clone();
 691        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
 692
 693        cx.spawn(async move |this, cx| {
 694            if !user_ids_to_fetch.is_empty() {
 695                this.update(cx, |this, cx| {
 696                    this.load_users(
 697                        proto::GetUsers {
 698                            user_ids: user_ids_to_fetch,
 699                        },
 700                        cx,
 701                    )
 702                })?
 703                .await?;
 704            }
 705
 706            this.read_with(cx, |this, _| {
 707                user_ids
 708                    .iter()
 709                    .map(|user_id| {
 710                        this.users
 711                            .get(user_id)
 712                            .cloned()
 713                            .with_context(|| format!("user {user_id} not found"))
 714                    })
 715                    .collect()
 716            })?
 717        })
 718    }
 719
 720    pub fn fuzzy_search_users(
 721        &self,
 722        query: String,
 723        cx: &Context<Self>,
 724    ) -> Task<Result<Vec<Arc<User>>>> {
 725        self.load_users(proto::FuzzySearchUsers { query }, cx)
 726    }
 727
 728    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
 729        self.users.get(&user_id).cloned()
 730    }
 731
 732    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
 733        if let Some(user) = self.users.get(&user_id).cloned() {
 734            return Some(user);
 735        }
 736
 737        self.get_user(user_id, cx).detach_and_log_err(cx);
 738        None
 739    }
 740
 741    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
 742        if let Some(user) = self.users.get(&user_id).cloned() {
 743            return Task::ready(Ok(user));
 744        }
 745
 746        let load_users = self.get_users(vec![user_id], cx);
 747        cx.spawn(async move |this, cx| {
 748            load_users.await?;
 749            this.read_with(cx, |this, _| {
 750                this.users
 751                    .get(&user_id)
 752                    .cloned()
 753                    .context("server responded with no users")
 754            })?
 755        })
 756    }
 757
 758    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
 759        self.by_github_login
 760            .get(github_login)
 761            .and_then(|id| self.users.get(id).cloned())
 762    }
 763
 764    pub fn current_user(&self) -> Option<Arc<User>> {
 765        self.current_user.borrow().clone()
 766    }
 767
 768    pub fn current_plan(&self) -> Option<proto::Plan> {
 769        #[cfg(debug_assertions)]
 770        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
 771            return match plan.as_str() {
 772                "free" => Some(proto::Plan::Free),
 773                "trial" => Some(proto::Plan::ZedProTrial),
 774                "pro" => Some(proto::Plan::ZedPro),
 775                _ => {
 776                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
 777                }
 778            };
 779        }
 780
 781        self.current_plan
 782    }
 783
 784    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 785        self.subscription_period
 786    }
 787
 788    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
 789        self.trial_started_at
 790    }
 791
 792    pub fn usage_based_billing_enabled(&self) -> Option<bool> {
 793        self.is_usage_based_billing_enabled
 794    }
 795
 796    pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
 797        self.model_request_usage
 798    }
 799
 800    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
 801        self.edit_prediction_usage
 802    }
 803
 804    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
 805        self.current_user.clone()
 806    }
 807
 808    /// Returns whether the user's account is too new to use the service.
 809    pub fn account_too_young(&self) -> bool {
 810        self.account_too_young.unwrap_or(false)
 811    }
 812
 813    /// Returns whether the current user has overdue invoices and usage should be blocked.
 814    pub fn has_overdue_invoices(&self) -> bool {
 815        self.has_overdue_invoices.unwrap_or(false)
 816    }
 817
 818    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
 819        self.accepted_tos_at
 820            .map(|accepted_tos_at| accepted_tos_at.is_some())
 821    }
 822
 823    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
 824        if self.current_user().is_none() {
 825            return Task::ready(Err(anyhow!("no current user")));
 826        };
 827
 828        let client = self.client.clone();
 829        cx.spawn(async move |this, cx| -> anyhow::Result<()> {
 830            let client = client.upgrade().context("client not found")?;
 831            let response = client
 832                .request(proto::AcceptTermsOfService {})
 833                .await
 834                .context("error accepting tos")?;
 835            this.update(cx, |this, cx| {
 836                this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
 837                cx.emit(Event::PrivateUserInfoUpdated);
 838            })?;
 839            Ok(())
 840        })
 841    }
 842
 843    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
 844        self.accepted_tos_at = Some(
 845            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
 846        );
 847    }
 848
 849    fn load_users(
 850        &self,
 851        request: impl RequestMessage<Response = UsersResponse>,
 852        cx: &Context<Self>,
 853    ) -> Task<Result<Vec<Arc<User>>>> {
 854        let client = self.client.clone();
 855        cx.spawn(async move |this, cx| {
 856            if let Some(rpc) = client.upgrade() {
 857                let response = rpc.request(request).await.context("error loading users")?;
 858                let users = response.users;
 859
 860                this.update(cx, |this, _| this.insert(users))
 861            } else {
 862                Ok(Vec::new())
 863            }
 864        })
 865    }
 866
 867    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
 868        let mut ret = Vec::with_capacity(users.len());
 869        for user in users {
 870            let user = User::new(user);
 871            if let Some(old) = self.users.insert(user.id, user.clone()) {
 872                if old.github_login != user.github_login {
 873                    self.by_github_login.remove(&old.github_login);
 874                }
 875            }
 876            self.by_github_login
 877                .insert(user.github_login.clone(), user.id);
 878            ret.push(user)
 879        }
 880        ret
 881    }
 882
 883    pub fn set_participant_indices(
 884        &mut self,
 885        participant_indices: HashMap<u64, ParticipantIndex>,
 886        cx: &mut Context<Self>,
 887    ) {
 888        if participant_indices != self.participant_indices {
 889            self.participant_indices = participant_indices;
 890            cx.emit(Event::ParticipantIndicesChanged);
 891        }
 892    }
 893
 894    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
 895        &self.participant_indices
 896    }
 897
 898    pub fn participant_names(
 899        &self,
 900        user_ids: impl Iterator<Item = u64>,
 901        cx: &App,
 902    ) -> HashMap<u64, SharedString> {
 903        let mut ret = HashMap::default();
 904        let mut missing_user_ids = Vec::new();
 905        for id in user_ids {
 906            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
 907                ret.insert(id, github_login.into());
 908            } else {
 909                missing_user_ids.push(id)
 910            }
 911        }
 912        if !missing_user_ids.is_empty() {
 913            let this = self.weak_self.clone();
 914            cx.spawn(async move |cx| {
 915                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
 916                    .await
 917            })
 918            .detach_and_log_err(cx);
 919        }
 920        ret
 921    }
 922}
 923
 924impl User {
 925    fn new(message: proto::User) -> Arc<Self> {
 926        Arc::new(User {
 927            id: message.id,
 928            github_login: message.github_login,
 929            avatar_uri: message.avatar_url.into(),
 930            name: message.name,
 931        })
 932    }
 933}
 934
 935impl Contact {
 936    async fn from_proto(
 937        contact: proto::Contact,
 938        user_store: &Entity<UserStore>,
 939        cx: &mut AsyncApp,
 940    ) -> Result<Self> {
 941        let user = user_store
 942            .update(cx, |user_store, cx| {
 943                user_store.get_user(contact.user_id, cx)
 944            })?
 945            .await?;
 946        Ok(Self {
 947            user,
 948            online: contact.online,
 949            busy: contact.busy,
 950        })
 951    }
 952}
 953
 954impl Collaborator {
 955    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
 956        Ok(Self {
 957            peer_id: message.peer_id.context("invalid peer id")?,
 958            replica_id: message.replica_id as ReplicaId,
 959            user_id: message.user_id as UserId,
 960            is_host: message.is_host,
 961            committer_name: message.committer_name,
 962            committer_email: message.committer_email,
 963        })
 964    }
 965}
 966
 967impl RequestUsage {
 968    pub fn over_limit(&self) -> bool {
 969        match self.limit {
 970            UsageLimit::Limited(limit) => self.amount >= limit,
 971            UsageLimit::Unlimited => false,
 972        }
 973    }
 974
 975    pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
 976        let limit = match limit.variant? {
 977            proto::usage_limit::Variant::Limited(limited) => {
 978                UsageLimit::Limited(limited.limit as i32)
 979            }
 980            proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
 981        };
 982        Some(RequestUsage {
 983            limit,
 984            amount: amount as i32,
 985        })
 986    }
 987
 988    fn from_headers(
 989        limit_name: &str,
 990        amount_name: &str,
 991        headers: &HeaderMap<HeaderValue>,
 992    ) -> Result<Self> {
 993        let limit = headers
 994            .get(limit_name)
 995            .with_context(|| format!("missing {limit_name:?} header"))?;
 996        let limit = UsageLimit::from_str(limit.to_str()?)?;
 997
 998        let amount = headers
 999            .get(amount_name)
1000            .with_context(|| format!("missing {amount_name:?} header"))?;
1001        let amount = amount.to_str()?.parse::<i32>()?;
1002
1003        Ok(Self { limit, amount })
1004    }
1005}
1006
1007impl ModelRequestUsage {
1008    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1009        Ok(Self(RequestUsage::from_headers(
1010            MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
1011            MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
1012            headers,
1013        )?))
1014    }
1015}
1016
1017impl EditPredictionUsage {
1018    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1019        Ok(Self(RequestUsage::from_headers(
1020            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1021            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1022            headers,
1023        )?))
1024    }
1025}