user.rs

   1use super::{Client, Status, TypedEnvelope, proto};
   2use anyhow::{Context as _, Result, anyhow};
   3use chrono::{DateTime, Utc};
   4use cloud_llm_client::{
   5    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
   6    MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
   7};
   8use collections::{HashMap, HashSet, hash_map::Entry};
   9use derive_more::Deref;
  10use feature_flags::FeatureFlagAppExt;
  11use futures::{Future, StreamExt, channel::mpsc};
  12use gpui::{
  13    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  14};
  15use http_client::http::{HeaderMap, HeaderValue};
  16use postage::{sink::Sink, watch};
  17use rpc::proto::{RequestMessage, UsersResponse};
  18use std::{
  19    str::FromStr as _,
  20    sync::{Arc, Weak},
  21};
  22use text::ReplicaId;
  23use util::{TryFutureExt as _, maybe};
  24
  25pub type UserId = u64;
  26
  27#[derive(
  28    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  29)]
  30pub struct ChannelId(pub u64);
  31
  32impl std::fmt::Display for ChannelId {
  33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  34        self.0.fmt(f)
  35    }
  36}
  37
  38#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
  39pub struct ProjectId(pub u64);
  40
  41impl ProjectId {
  42    pub fn to_proto(&self) -> u64 {
  43        self.0
  44    }
  45}
  46
  47#[derive(
  48    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  49)]
  50pub struct DevServerProjectId(pub u64);
  51
  52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  53pub struct ParticipantIndex(pub u32);
  54
  55#[derive(Default, Debug)]
  56pub struct User {
  57    pub id: UserId,
  58    pub github_login: String,
  59    pub avatar_uri: SharedUri,
  60    pub name: Option<String>,
  61}
  62
  63#[derive(Clone, Debug, PartialEq, Eq)]
  64pub struct Collaborator {
  65    pub peer_id: proto::PeerId,
  66    pub replica_id: ReplicaId,
  67    pub user_id: UserId,
  68    pub is_host: bool,
  69    pub committer_name: Option<String>,
  70    pub committer_email: Option<String>,
  71}
  72
  73impl PartialOrd for User {
  74    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
  75        Some(self.cmp(other))
  76    }
  77}
  78
  79impl Ord for User {
  80    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
  81        self.github_login.cmp(&other.github_login)
  82    }
  83}
  84
  85impl PartialEq for User {
  86    fn eq(&self, other: &Self) -> bool {
  87        self.id == other.id && self.github_login == other.github_login
  88    }
  89}
  90
  91impl Eq for User {}
  92
  93#[derive(Debug, PartialEq)]
  94pub struct Contact {
  95    pub user: Arc<User>,
  96    pub online: bool,
  97    pub busy: bool,
  98}
  99
 100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 101pub enum ContactRequestStatus {
 102    None,
 103    RequestSent,
 104    RequestReceived,
 105    RequestAccepted,
 106}
 107
 108pub struct UserStore {
 109    users: HashMap<u64, Arc<User>>,
 110    by_github_login: HashMap<String, u64>,
 111    participant_indices: HashMap<u64, ParticipantIndex>,
 112    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 113    current_plan: Option<proto::Plan>,
 114    subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
 115    trial_started_at: Option<DateTime<Utc>>,
 116    model_request_usage: Option<ModelRequestUsage>,
 117    edit_prediction_usage: Option<EditPredictionUsage>,
 118    is_usage_based_billing_enabled: Option<bool>,
 119    account_too_young: Option<bool>,
 120    has_overdue_invoices: Option<bool>,
 121    current_user: watch::Receiver<Option<Arc<User>>>,
 122    accepted_tos_at: Option<Option<DateTime<Utc>>>,
 123    contacts: Vec<Arc<Contact>>,
 124    incoming_contact_requests: Vec<Arc<User>>,
 125    outgoing_contact_requests: Vec<Arc<User>>,
 126    pending_contact_requests: HashMap<u64, usize>,
 127    invite_info: Option<InviteInfo>,
 128    client: Weak<Client>,
 129    _maintain_contacts: Task<()>,
 130    _maintain_current_user: Task<Result<()>>,
 131    weak_self: WeakEntity<Self>,
 132}
 133
 134#[derive(Clone)]
 135pub struct InviteInfo {
 136    pub count: u32,
 137    pub url: Arc<str>,
 138}
 139
 140pub enum Event {
 141    Contact {
 142        user: Arc<User>,
 143        kind: ContactEventKind,
 144    },
 145    ShowContacts,
 146    ParticipantIndicesChanged,
 147    PrivateUserInfoUpdated,
 148}
 149
 150#[derive(Clone, Copy)]
 151pub enum ContactEventKind {
 152    Requested,
 153    Accepted,
 154    Cancelled,
 155}
 156
 157impl EventEmitter<Event> for UserStore {}
 158
 159enum UpdateContacts {
 160    Update(proto::UpdateContacts),
 161    Wait(postage::barrier::Sender),
 162    Clear(postage::barrier::Sender),
 163}
 164
 165#[derive(Debug, Clone, Copy, Deref)]
 166pub struct ModelRequestUsage(pub RequestUsage);
 167
 168#[derive(Debug, Clone, Copy, Deref)]
 169pub struct EditPredictionUsage(pub RequestUsage);
 170
 171#[derive(Debug, Clone, Copy)]
 172pub struct RequestUsage {
 173    pub limit: UsageLimit,
 174    pub amount: i32,
 175}
 176
 177impl UserStore {
 178    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
 179        let (mut current_user_tx, current_user_rx) = watch::channel();
 180        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 181        let rpc_subscriptions = vec![
 182            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
 183            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
 184            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
 185            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
 186        ];
 187        Self {
 188            users: Default::default(),
 189            by_github_login: Default::default(),
 190            current_user: current_user_rx,
 191            current_plan: None,
 192            subscription_period: None,
 193            trial_started_at: None,
 194            model_request_usage: None,
 195            edit_prediction_usage: None,
 196            is_usage_based_billing_enabled: None,
 197            account_too_young: None,
 198            has_overdue_invoices: None,
 199            accepted_tos_at: None,
 200            contacts: Default::default(),
 201            incoming_contact_requests: Default::default(),
 202            participant_indices: Default::default(),
 203            outgoing_contact_requests: Default::default(),
 204            invite_info: None,
 205            client: Arc::downgrade(&client),
 206            update_contacts_tx,
 207            _maintain_contacts: cx.spawn(async move |this, cx| {
 208                let _subscriptions = rpc_subscriptions;
 209                while let Some(message) = update_contacts_rx.next().await {
 210                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
 211                    {
 212                        task.log_err().await;
 213                    } else {
 214                        break;
 215                    }
 216                }
 217            }),
 218            _maintain_current_user: cx.spawn(async move |this, cx| {
 219                let mut status = client.status();
 220                let weak = Arc::downgrade(&client);
 221                drop(client);
 222                while let Some(status) = status.next().await {
 223                    // if the client is dropped, the app is shutting down.
 224                    let Some(client) = weak.upgrade() else {
 225                        return Ok(());
 226                    };
 227                    match status {
 228                        Status::Connected { .. } => {
 229                            if let Some(user_id) = client.user_id() {
 230                                let fetch_user = if let Ok(fetch_user) =
 231                                    this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
 232                                {
 233                                    fetch_user
 234                                } else {
 235                                    break;
 236                                };
 237                                let fetch_private_user_info =
 238                                    client.request(proto::GetPrivateUserInfo {}).log_err();
 239                                let (user, info) =
 240                                    futures::join!(fetch_user, fetch_private_user_info);
 241
 242                                cx.update(|cx| {
 243                                    if let Some(info) = info {
 244                                        let staff =
 245                                            info.staff && !*feature_flags::ZED_DISABLE_STAFF;
 246                                        cx.update_flags(staff, info.flags);
 247                                        client.telemetry.set_authenticated_user_info(
 248                                            Some(info.metrics_id.clone()),
 249                                            staff,
 250                                        );
 251
 252                                        this.update(cx, |this, cx| {
 253                                            let accepted_tos_at = {
 254                                                #[cfg(debug_assertions)]
 255                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
 256                                                {
 257                                                    None
 258                                                } else {
 259                                                    info.accepted_tos_at
 260                                                }
 261
 262                                                #[cfg(not(debug_assertions))]
 263                                                info.accepted_tos_at
 264                                            };
 265
 266                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
 267                                            cx.emit(Event::PrivateUserInfoUpdated);
 268                                        })
 269                                    } else {
 270                                        anyhow::Ok(())
 271                                    }
 272                                })??;
 273
 274                                current_user_tx.send(user).await.ok();
 275
 276                                this.update(cx, |_, cx| cx.notify())?;
 277                            }
 278                        }
 279                        Status::SignedOut => {
 280                            current_user_tx.send(None).await.ok();
 281                            this.update(cx, |this, cx| {
 282                                this.accepted_tos_at = None;
 283                                cx.emit(Event::PrivateUserInfoUpdated);
 284                                cx.notify();
 285                                this.clear_contacts()
 286                            })?
 287                            .await;
 288                        }
 289                        Status::ConnectionLost => {
 290                            this.update(cx, |this, cx| {
 291                                cx.notify();
 292                                this.clear_contacts()
 293                            })?
 294                            .await;
 295                        }
 296                        _ => {}
 297                    }
 298                }
 299                Ok(())
 300            }),
 301            pending_contact_requests: Default::default(),
 302            weak_self: cx.weak_entity(),
 303        }
 304    }
 305
 306    #[cfg(feature = "test-support")]
 307    pub fn clear_cache(&mut self) {
 308        self.users.clear();
 309        self.by_github_login.clear();
 310    }
 311
 312    async fn handle_update_invite_info(
 313        this: Entity<Self>,
 314        message: TypedEnvelope<proto::UpdateInviteInfo>,
 315        mut cx: AsyncApp,
 316    ) -> Result<()> {
 317        this.update(&mut cx, |this, cx| {
 318            this.invite_info = Some(InviteInfo {
 319                url: Arc::from(message.payload.url),
 320                count: message.payload.count,
 321            });
 322            cx.notify();
 323        })?;
 324        Ok(())
 325    }
 326
 327    async fn handle_show_contacts(
 328        this: Entity<Self>,
 329        _: TypedEnvelope<proto::ShowContacts>,
 330        mut cx: AsyncApp,
 331    ) -> Result<()> {
 332        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
 333        Ok(())
 334    }
 335
 336    pub fn invite_info(&self) -> Option<&InviteInfo> {
 337        self.invite_info.as_ref()
 338    }
 339
 340    async fn handle_update_contacts(
 341        this: Entity<Self>,
 342        message: TypedEnvelope<proto::UpdateContacts>,
 343        mut cx: AsyncApp,
 344    ) -> Result<()> {
 345        this.read_with(&mut cx, |this, _| {
 346            this.update_contacts_tx
 347                .unbounded_send(UpdateContacts::Update(message.payload))
 348                .unwrap();
 349        })?;
 350        Ok(())
 351    }
 352
 353    async fn handle_update_plan(
 354        this: Entity<Self>,
 355        message: TypedEnvelope<proto::UpdateUserPlan>,
 356        mut cx: AsyncApp,
 357    ) -> Result<()> {
 358        this.update(&mut cx, |this, cx| {
 359            this.current_plan = Some(message.payload.plan());
 360            this.subscription_period = maybe!({
 361                let period = message.payload.subscription_period?;
 362                let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
 363                let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
 364
 365                Some((started_at, ended_at))
 366            });
 367            this.trial_started_at = message
 368                .payload
 369                .trial_started_at
 370                .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
 371            this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
 372            this.account_too_young = message.payload.account_too_young;
 373            this.has_overdue_invoices = message.payload.has_overdue_invoices;
 374
 375            if let Some(usage) = message.payload.usage {
 376                // limits are always present even though they are wrapped in Option
 377                this.model_request_usage = usage
 378                    .model_requests_usage_limit
 379                    .and_then(|limit| {
 380                        RequestUsage::from_proto(usage.model_requests_usage_amount, limit)
 381                    })
 382                    .map(ModelRequestUsage);
 383                this.edit_prediction_usage = usage
 384                    .edit_predictions_usage_limit
 385                    .and_then(|limit| {
 386                        RequestUsage::from_proto(usage.model_requests_usage_amount, limit)
 387                    })
 388                    .map(EditPredictionUsage);
 389            }
 390
 391            cx.notify();
 392        })?;
 393        Ok(())
 394    }
 395
 396    pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
 397        self.model_request_usage = Some(usage);
 398        cx.notify();
 399    }
 400
 401    pub fn update_edit_prediction_usage(
 402        &mut self,
 403        usage: EditPredictionUsage,
 404        cx: &mut Context<Self>,
 405    ) {
 406        self.edit_prediction_usage = Some(usage);
 407        cx.notify();
 408    }
 409
 410    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
 411        match message {
 412            UpdateContacts::Wait(barrier) => {
 413                drop(barrier);
 414                Task::ready(Ok(()))
 415            }
 416            UpdateContacts::Clear(barrier) => {
 417                self.contacts.clear();
 418                self.incoming_contact_requests.clear();
 419                self.outgoing_contact_requests.clear();
 420                drop(barrier);
 421                Task::ready(Ok(()))
 422            }
 423            UpdateContacts::Update(message) => {
 424                let mut user_ids = HashSet::default();
 425                for contact in &message.contacts {
 426                    user_ids.insert(contact.user_id);
 427                }
 428                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
 429                user_ids.extend(message.outgoing_requests.iter());
 430
 431                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
 432                cx.spawn(async move |this, cx| {
 433                    load_users.await?;
 434
 435                    // Users are fetched in parallel above and cached in call to get_users
 436                    // No need to parallelize here
 437                    let mut updated_contacts = Vec::new();
 438                    let this = this.upgrade().context("can't upgrade user store handle")?;
 439                    for contact in message.contacts {
 440                        updated_contacts
 441                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
 442                    }
 443
 444                    let mut incoming_requests = Vec::new();
 445                    for request in message.incoming_requests {
 446                        incoming_requests.push({
 447                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
 448                                .await?
 449                        });
 450                    }
 451
 452                    let mut outgoing_requests = Vec::new();
 453                    for requested_user_id in message.outgoing_requests {
 454                        outgoing_requests.push(
 455                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
 456                                .await?,
 457                        );
 458                    }
 459
 460                    let removed_contacts =
 461                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
 462                    let removed_incoming_requests =
 463                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
 464                    let removed_outgoing_requests =
 465                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
 466
 467                    this.update(cx, |this, cx| {
 468                        // Remove contacts
 469                        this.contacts
 470                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
 471                        // Update existing contacts and insert new ones
 472                        for updated_contact in updated_contacts {
 473                            match this.contacts.binary_search_by_key(
 474                                &&updated_contact.user.github_login,
 475                                |contact| &contact.user.github_login,
 476                            ) {
 477                                Ok(ix) => this.contacts[ix] = updated_contact,
 478                                Err(ix) => this.contacts.insert(ix, updated_contact),
 479                            }
 480                        }
 481
 482                        // Remove incoming contact requests
 483                        this.incoming_contact_requests.retain(|user| {
 484                            if removed_incoming_requests.contains(&user.id) {
 485                                cx.emit(Event::Contact {
 486                                    user: user.clone(),
 487                                    kind: ContactEventKind::Cancelled,
 488                                });
 489                                false
 490                            } else {
 491                                true
 492                            }
 493                        });
 494                        // Update existing incoming requests and insert new ones
 495                        for user in incoming_requests {
 496                            match this
 497                                .incoming_contact_requests
 498                                .binary_search_by_key(&&user.github_login, |contact| {
 499                                    &contact.github_login
 500                                }) {
 501                                Ok(ix) => this.incoming_contact_requests[ix] = user,
 502                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
 503                            }
 504                        }
 505
 506                        // Remove outgoing contact requests
 507                        this.outgoing_contact_requests
 508                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
 509                        // Update existing incoming requests and insert new ones
 510                        for request in outgoing_requests {
 511                            match this
 512                                .outgoing_contact_requests
 513                                .binary_search_by_key(&&request.github_login, |contact| {
 514                                    &contact.github_login
 515                                }) {
 516                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
 517                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
 518                            }
 519                        }
 520
 521                        cx.notify();
 522                    })?;
 523
 524                    Ok(())
 525                })
 526            }
 527        }
 528    }
 529
 530    pub fn contacts(&self) -> &[Arc<Contact>] {
 531        &self.contacts
 532    }
 533
 534    pub fn has_contact(&self, user: &Arc<User>) -> bool {
 535        self.contacts
 536            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 537            .is_ok()
 538    }
 539
 540    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
 541        &self.incoming_contact_requests
 542    }
 543
 544    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
 545        &self.outgoing_contact_requests
 546    }
 547
 548    pub fn is_contact_request_pending(&self, user: &User) -> bool {
 549        self.pending_contact_requests.contains_key(&user.id)
 550    }
 551
 552    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
 553        if self
 554            .contacts
 555            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 556            .is_ok()
 557        {
 558            ContactRequestStatus::RequestAccepted
 559        } else if self
 560            .outgoing_contact_requests
 561            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 562            .is_ok()
 563        {
 564            ContactRequestStatus::RequestSent
 565        } else if self
 566            .incoming_contact_requests
 567            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 568            .is_ok()
 569        {
 570            ContactRequestStatus::RequestReceived
 571        } else {
 572            ContactRequestStatus::None
 573        }
 574    }
 575
 576    pub fn request_contact(
 577        &mut self,
 578        responder_id: u64,
 579        cx: &mut Context<Self>,
 580    ) -> Task<Result<()>> {
 581        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
 582    }
 583
 584    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
 585        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
 586    }
 587
 588    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
 589        self.incoming_contact_requests
 590            .iter()
 591            .any(|user| user.id == user_id)
 592    }
 593
 594    pub fn respond_to_contact_request(
 595        &mut self,
 596        requester_id: u64,
 597        accept: bool,
 598        cx: &mut Context<Self>,
 599    ) -> Task<Result<()>> {
 600        self.perform_contact_request(
 601            requester_id,
 602            proto::RespondToContactRequest {
 603                requester_id,
 604                response: if accept {
 605                    proto::ContactRequestResponse::Accept
 606                } else {
 607                    proto::ContactRequestResponse::Decline
 608                } as i32,
 609            },
 610            cx,
 611        )
 612    }
 613
 614    pub fn dismiss_contact_request(
 615        &self,
 616        requester_id: u64,
 617        cx: &Context<Self>,
 618    ) -> Task<Result<()>> {
 619        let client = self.client.upgrade();
 620        cx.spawn(async move |_, _| {
 621            client
 622                .context("can't upgrade client reference")?
 623                .request(proto::RespondToContactRequest {
 624                    requester_id,
 625                    response: proto::ContactRequestResponse::Dismiss as i32,
 626                })
 627                .await?;
 628            Ok(())
 629        })
 630    }
 631
 632    fn perform_contact_request<T: RequestMessage>(
 633        &mut self,
 634        user_id: u64,
 635        request: T,
 636        cx: &mut Context<Self>,
 637    ) -> Task<Result<()>> {
 638        let client = self.client.upgrade();
 639        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
 640        cx.notify();
 641
 642        cx.spawn(async move |this, cx| {
 643            let response = client
 644                .context("can't upgrade client reference")?
 645                .request(request)
 646                .await;
 647            this.update(cx, |this, cx| {
 648                if let Entry::Occupied(mut request_count) =
 649                    this.pending_contact_requests.entry(user_id)
 650                {
 651                    *request_count.get_mut() -= 1;
 652                    if *request_count.get() == 0 {
 653                        request_count.remove();
 654                    }
 655                }
 656                cx.notify();
 657            })?;
 658            response?;
 659            Ok(())
 660        })
 661    }
 662
 663    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
 664        let (tx, mut rx) = postage::barrier::channel();
 665        self.update_contacts_tx
 666            .unbounded_send(UpdateContacts::Clear(tx))
 667            .unwrap();
 668        async move {
 669            rx.next().await;
 670        }
 671    }
 672
 673    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
 674        let (tx, mut rx) = postage::barrier::channel();
 675        self.update_contacts_tx
 676            .unbounded_send(UpdateContacts::Wait(tx))
 677            .unwrap();
 678        async move {
 679            rx.next().await;
 680        }
 681    }
 682
 683    pub fn get_users(
 684        &self,
 685        user_ids: Vec<u64>,
 686        cx: &Context<Self>,
 687    ) -> Task<Result<Vec<Arc<User>>>> {
 688        let mut user_ids_to_fetch = user_ids.clone();
 689        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
 690
 691        cx.spawn(async move |this, cx| {
 692            if !user_ids_to_fetch.is_empty() {
 693                this.update(cx, |this, cx| {
 694                    this.load_users(
 695                        proto::GetUsers {
 696                            user_ids: user_ids_to_fetch,
 697                        },
 698                        cx,
 699                    )
 700                })?
 701                .await?;
 702            }
 703
 704            this.read_with(cx, |this, _| {
 705                user_ids
 706                    .iter()
 707                    .map(|user_id| {
 708                        this.users
 709                            .get(user_id)
 710                            .cloned()
 711                            .with_context(|| format!("user {user_id} not found"))
 712                    })
 713                    .collect()
 714            })?
 715        })
 716    }
 717
 718    pub fn fuzzy_search_users(
 719        &self,
 720        query: String,
 721        cx: &Context<Self>,
 722    ) -> Task<Result<Vec<Arc<User>>>> {
 723        self.load_users(proto::FuzzySearchUsers { query }, cx)
 724    }
 725
 726    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
 727        self.users.get(&user_id).cloned()
 728    }
 729
 730    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
 731        if let Some(user) = self.users.get(&user_id).cloned() {
 732            return Some(user);
 733        }
 734
 735        self.get_user(user_id, cx).detach_and_log_err(cx);
 736        None
 737    }
 738
 739    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
 740        if let Some(user) = self.users.get(&user_id).cloned() {
 741            return Task::ready(Ok(user));
 742        }
 743
 744        let load_users = self.get_users(vec![user_id], cx);
 745        cx.spawn(async move |this, cx| {
 746            load_users.await?;
 747            this.read_with(cx, |this, _| {
 748                this.users
 749                    .get(&user_id)
 750                    .cloned()
 751                    .context("server responded with no users")
 752            })?
 753        })
 754    }
 755
 756    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
 757        self.by_github_login
 758            .get(github_login)
 759            .and_then(|id| self.users.get(id).cloned())
 760    }
 761
 762    pub fn current_user(&self) -> Option<Arc<User>> {
 763        self.current_user.borrow().clone()
 764    }
 765
 766    pub fn current_plan(&self) -> Option<proto::Plan> {
 767        #[cfg(debug_assertions)]
 768        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
 769            return match plan.as_str() {
 770                "free" => Some(proto::Plan::Free),
 771                "trial" => Some(proto::Plan::ZedProTrial),
 772                "pro" => Some(proto::Plan::ZedPro),
 773                _ => {
 774                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
 775                }
 776            };
 777        }
 778
 779        self.current_plan
 780    }
 781
 782    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 783        self.subscription_period
 784    }
 785
 786    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
 787        self.trial_started_at
 788    }
 789
 790    pub fn usage_based_billing_enabled(&self) -> Option<bool> {
 791        self.is_usage_based_billing_enabled
 792    }
 793
 794    pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
 795        self.model_request_usage
 796    }
 797
 798    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
 799        self.edit_prediction_usage
 800    }
 801
 802    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
 803        self.current_user.clone()
 804    }
 805
 806    /// Returns whether the user's account is too new to use the service.
 807    pub fn account_too_young(&self) -> bool {
 808        self.account_too_young.unwrap_or(false)
 809    }
 810
 811    /// Returns whether the current user has overdue invoices and usage should be blocked.
 812    pub fn has_overdue_invoices(&self) -> bool {
 813        self.has_overdue_invoices.unwrap_or(false)
 814    }
 815
 816    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
 817        self.accepted_tos_at
 818            .map(|accepted_tos_at| accepted_tos_at.is_some())
 819    }
 820
 821    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
 822        if self.current_user().is_none() {
 823            return Task::ready(Err(anyhow!("no current user")));
 824        };
 825
 826        let client = self.client.clone();
 827        cx.spawn(async move |this, cx| -> anyhow::Result<()> {
 828            let client = client.upgrade().context("client not found")?;
 829            let response = client
 830                .request(proto::AcceptTermsOfService {})
 831                .await
 832                .context("error accepting tos")?;
 833            this.update(cx, |this, cx| {
 834                this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
 835                cx.emit(Event::PrivateUserInfoUpdated);
 836            })?;
 837            Ok(())
 838        })
 839    }
 840
 841    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
 842        self.accepted_tos_at = Some(
 843            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
 844        );
 845    }
 846
 847    fn load_users(
 848        &self,
 849        request: impl RequestMessage<Response = UsersResponse>,
 850        cx: &Context<Self>,
 851    ) -> Task<Result<Vec<Arc<User>>>> {
 852        let client = self.client.clone();
 853        cx.spawn(async move |this, cx| {
 854            if let Some(rpc) = client.upgrade() {
 855                let response = rpc.request(request).await.context("error loading users")?;
 856                let users = response.users;
 857
 858                this.update(cx, |this, _| this.insert(users))
 859            } else {
 860                Ok(Vec::new())
 861            }
 862        })
 863    }
 864
 865    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
 866        let mut ret = Vec::with_capacity(users.len());
 867        for user in users {
 868            let user = User::new(user);
 869            if let Some(old) = self.users.insert(user.id, user.clone()) {
 870                if old.github_login != user.github_login {
 871                    self.by_github_login.remove(&old.github_login);
 872                }
 873            }
 874            self.by_github_login
 875                .insert(user.github_login.clone(), user.id);
 876            ret.push(user)
 877        }
 878        ret
 879    }
 880
 881    pub fn set_participant_indices(
 882        &mut self,
 883        participant_indices: HashMap<u64, ParticipantIndex>,
 884        cx: &mut Context<Self>,
 885    ) {
 886        if participant_indices != self.participant_indices {
 887            self.participant_indices = participant_indices;
 888            cx.emit(Event::ParticipantIndicesChanged);
 889        }
 890    }
 891
 892    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
 893        &self.participant_indices
 894    }
 895
 896    pub fn participant_names(
 897        &self,
 898        user_ids: impl Iterator<Item = u64>,
 899        cx: &App,
 900    ) -> HashMap<u64, SharedString> {
 901        let mut ret = HashMap::default();
 902        let mut missing_user_ids = Vec::new();
 903        for id in user_ids {
 904            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
 905                ret.insert(id, github_login.into());
 906            } else {
 907                missing_user_ids.push(id)
 908            }
 909        }
 910        if !missing_user_ids.is_empty() {
 911            let this = self.weak_self.clone();
 912            cx.spawn(async move |cx| {
 913                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
 914                    .await
 915            })
 916            .detach_and_log_err(cx);
 917        }
 918        ret
 919    }
 920}
 921
 922impl User {
 923    fn new(message: proto::User) -> Arc<Self> {
 924        Arc::new(User {
 925            id: message.id,
 926            github_login: message.github_login,
 927            avatar_uri: message.avatar_url.into(),
 928            name: message.name,
 929        })
 930    }
 931}
 932
 933impl Contact {
 934    async fn from_proto(
 935        contact: proto::Contact,
 936        user_store: &Entity<UserStore>,
 937        cx: &mut AsyncApp,
 938    ) -> Result<Self> {
 939        let user = user_store
 940            .update(cx, |user_store, cx| {
 941                user_store.get_user(contact.user_id, cx)
 942            })?
 943            .await?;
 944        Ok(Self {
 945            user,
 946            online: contact.online,
 947            busy: contact.busy,
 948        })
 949    }
 950}
 951
 952impl Collaborator {
 953    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
 954        Ok(Self {
 955            peer_id: message.peer_id.context("invalid peer id")?,
 956            replica_id: message.replica_id as ReplicaId,
 957            user_id: message.user_id as UserId,
 958            is_host: message.is_host,
 959            committer_name: message.committer_name,
 960            committer_email: message.committer_email,
 961        })
 962    }
 963}
 964
 965impl RequestUsage {
 966    pub fn over_limit(&self) -> bool {
 967        match self.limit {
 968            UsageLimit::Limited(limit) => self.amount >= limit,
 969            UsageLimit::Unlimited => false,
 970        }
 971    }
 972
 973    pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
 974        let limit = match limit.variant? {
 975            proto::usage_limit::Variant::Limited(limited) => {
 976                UsageLimit::Limited(limited.limit as i32)
 977            }
 978            proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
 979        };
 980        Some(RequestUsage {
 981            limit,
 982            amount: amount as i32,
 983        })
 984    }
 985
 986    fn from_headers(
 987        limit_name: &str,
 988        amount_name: &str,
 989        headers: &HeaderMap<HeaderValue>,
 990    ) -> Result<Self> {
 991        let limit = headers
 992            .get(limit_name)
 993            .with_context(|| format!("missing {limit_name:?} header"))?;
 994        let limit = UsageLimit::from_str(limit.to_str()?)?;
 995
 996        let amount = headers
 997            .get(amount_name)
 998            .with_context(|| format!("missing {amount_name:?} header"))?;
 999        let amount = amount.to_str()?.parse::<i32>()?;
1000
1001        Ok(Self { limit, amount })
1002    }
1003}
1004
1005impl ModelRequestUsage {
1006    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1007        Ok(Self(RequestUsage::from_headers(
1008            MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
1009            MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
1010            headers,
1011        )?))
1012    }
1013}
1014
1015impl EditPredictionUsage {
1016    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1017        Ok(Self(RequestUsage::from_headers(
1018            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1019            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1020            headers,
1021        )?))
1022    }
1023}