user.rs

   1use super::{Client, Status, TypedEnvelope, proto};
   2use anyhow::{Context as _, Result, anyhow};
   3use chrono::{DateTime, Utc};
   4use cloud_llm_client::{
   5    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
   6    MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
   7};
   8use collections::{HashMap, HashSet, hash_map::Entry};
   9use derive_more::Deref;
  10use feature_flags::FeatureFlagAppExt;
  11use futures::{Future, StreamExt, channel::mpsc};
  12use gpui::{
  13    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  14};
  15use http_client::http::{HeaderMap, HeaderValue};
  16use postage::{sink::Sink, watch};
  17use rpc::proto::{RequestMessage, UsersResponse};
  18use std::{
  19    str::FromStr as _,
  20    sync::{Arc, Weak},
  21};
  22use text::ReplicaId;
  23use util::{TryFutureExt as _, maybe};
  24
  25pub type UserId = u64;
  26
  27#[derive(
  28    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  29)]
  30pub struct ChannelId(pub u64);
  31
  32impl std::fmt::Display for ChannelId {
  33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  34        self.0.fmt(f)
  35    }
  36}
  37
  38#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
  39pub struct ProjectId(pub u64);
  40
  41impl ProjectId {
  42    pub fn to_proto(&self) -> u64 {
  43        self.0
  44    }
  45}
  46
  47#[derive(
  48    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  49)]
  50pub struct DevServerProjectId(pub u64);
  51
  52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  53pub struct ParticipantIndex(pub u32);
  54
  55#[derive(Default, Debug)]
  56pub struct User {
  57    pub id: UserId,
  58    pub github_login: SharedString,
  59    pub avatar_uri: SharedUri,
  60    pub name: Option<String>,
  61}
  62
  63#[derive(Clone, Debug, PartialEq, Eq)]
  64pub struct Collaborator {
  65    pub peer_id: proto::PeerId,
  66    pub replica_id: ReplicaId,
  67    pub user_id: UserId,
  68    pub is_host: bool,
  69    pub committer_name: Option<String>,
  70    pub committer_email: Option<String>,
  71}
  72
  73impl PartialOrd for User {
  74    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
  75        Some(self.cmp(other))
  76    }
  77}
  78
  79impl Ord for User {
  80    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
  81        self.github_login.cmp(&other.github_login)
  82    }
  83}
  84
  85impl PartialEq for User {
  86    fn eq(&self, other: &Self) -> bool {
  87        self.id == other.id && self.github_login == other.github_login
  88    }
  89}
  90
  91impl Eq for User {}
  92
  93#[derive(Debug, PartialEq)]
  94pub struct Contact {
  95    pub user: Arc<User>,
  96    pub online: bool,
  97    pub busy: bool,
  98}
  99
 100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 101pub enum ContactRequestStatus {
 102    None,
 103    RequestSent,
 104    RequestReceived,
 105    RequestAccepted,
 106}
 107
 108pub struct UserStore {
 109    users: HashMap<u64, Arc<User>>,
 110    by_github_login: HashMap<SharedString, u64>,
 111    participant_indices: HashMap<u64, ParticipantIndex>,
 112    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 113    current_plan: Option<proto::Plan>,
 114    subscription_period: Option<(DateTime<Utc>, DateTime<Utc>)>,
 115    trial_started_at: Option<DateTime<Utc>>,
 116    model_request_usage: Option<ModelRequestUsage>,
 117    is_usage_based_billing_enabled: Option<bool>,
 118    account_too_young: Option<bool>,
 119    has_overdue_invoices: Option<bool>,
 120    current_user: watch::Receiver<Option<Arc<User>>>,
 121    accepted_tos_at: Option<Option<DateTime<Utc>>>,
 122    contacts: Vec<Arc<Contact>>,
 123    incoming_contact_requests: Vec<Arc<User>>,
 124    outgoing_contact_requests: Vec<Arc<User>>,
 125    pending_contact_requests: HashMap<u64, usize>,
 126    invite_info: Option<InviteInfo>,
 127    client: Weak<Client>,
 128    _maintain_contacts: Task<()>,
 129    _maintain_current_user: Task<Result<()>>,
 130    weak_self: WeakEntity<Self>,
 131}
 132
 133#[derive(Clone)]
 134pub struct InviteInfo {
 135    pub count: u32,
 136    pub url: Arc<str>,
 137}
 138
 139pub enum Event {
 140    Contact {
 141        user: Arc<User>,
 142        kind: ContactEventKind,
 143    },
 144    ShowContacts,
 145    ParticipantIndicesChanged,
 146    PrivateUserInfoUpdated,
 147    PlanUpdated,
 148}
 149
 150#[derive(Clone, Copy)]
 151pub enum ContactEventKind {
 152    Requested,
 153    Accepted,
 154    Cancelled,
 155}
 156
 157impl EventEmitter<Event> for UserStore {}
 158
 159enum UpdateContacts {
 160    Update(proto::UpdateContacts),
 161    Wait(postage::barrier::Sender),
 162    Clear(postage::barrier::Sender),
 163}
 164
 165#[derive(Debug, Clone, Copy, Deref)]
 166pub struct ModelRequestUsage(pub RequestUsage);
 167
 168#[derive(Debug, Clone, Copy, Deref)]
 169pub struct EditPredictionUsage(pub RequestUsage);
 170
 171#[derive(Debug, Clone, Copy)]
 172pub struct RequestUsage {
 173    pub limit: UsageLimit,
 174    pub amount: i32,
 175}
 176
 177impl UserStore {
 178    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
 179        let (mut current_user_tx, current_user_rx) = watch::channel();
 180        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 181        let rpc_subscriptions = vec![
 182            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
 183            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
 184            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
 185            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
 186        ];
 187        Self {
 188            users: Default::default(),
 189            by_github_login: Default::default(),
 190            current_user: current_user_rx,
 191            current_plan: None,
 192            subscription_period: None,
 193            trial_started_at: None,
 194            model_request_usage: None,
 195            is_usage_based_billing_enabled: None,
 196            account_too_young: None,
 197            has_overdue_invoices: None,
 198            accepted_tos_at: None,
 199            contacts: Default::default(),
 200            incoming_contact_requests: Default::default(),
 201            participant_indices: Default::default(),
 202            outgoing_contact_requests: Default::default(),
 203            invite_info: None,
 204            client: Arc::downgrade(&client),
 205            update_contacts_tx,
 206            _maintain_contacts: cx.spawn(async move |this, cx| {
 207                let _subscriptions = rpc_subscriptions;
 208                while let Some(message) = update_contacts_rx.next().await {
 209                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
 210                    {
 211                        task.log_err().await;
 212                    } else {
 213                        break;
 214                    }
 215                }
 216            }),
 217            _maintain_current_user: cx.spawn(async move |this, cx| {
 218                let mut status = client.status();
 219                let weak = Arc::downgrade(&client);
 220                drop(client);
 221                while let Some(status) = status.next().await {
 222                    // if the client is dropped, the app is shutting down.
 223                    let Some(client) = weak.upgrade() else {
 224                        return Ok(());
 225                    };
 226                    match status {
 227                        Status::Connected { .. } => {
 228                            if let Some(user_id) = client.user_id() {
 229                                let fetch_user = if let Ok(fetch_user) =
 230                                    this.update(cx, |this, cx| this.get_user(user_id, cx).log_err())
 231                                {
 232                                    fetch_user
 233                                } else {
 234                                    break;
 235                                };
 236                                let fetch_private_user_info =
 237                                    client.request(proto::GetPrivateUserInfo {}).log_err();
 238                                let (user, info) =
 239                                    futures::join!(fetch_user, fetch_private_user_info);
 240
 241                                cx.update(|cx| {
 242                                    if let Some(info) = info {
 243                                        let staff =
 244                                            info.staff && !*feature_flags::ZED_DISABLE_STAFF;
 245                                        cx.update_flags(staff, info.flags);
 246                                        client.telemetry.set_authenticated_user_info(
 247                                            Some(info.metrics_id.clone()),
 248                                            staff,
 249                                        );
 250
 251                                        this.update(cx, |this, cx| {
 252                                            let accepted_tos_at = {
 253                                                #[cfg(debug_assertions)]
 254                                                if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok()
 255                                                {
 256                                                    None
 257                                                } else {
 258                                                    info.accepted_tos_at
 259                                                }
 260
 261                                                #[cfg(not(debug_assertions))]
 262                                                info.accepted_tos_at
 263                                            };
 264
 265                                            this.set_current_user_accepted_tos_at(accepted_tos_at);
 266                                            cx.emit(Event::PrivateUserInfoUpdated);
 267                                        })
 268                                    } else {
 269                                        anyhow::Ok(())
 270                                    }
 271                                })??;
 272
 273                                current_user_tx.send(user).await.ok();
 274
 275                                this.update(cx, |_, cx| cx.notify())?;
 276                            }
 277                        }
 278                        Status::SignedOut => {
 279                            current_user_tx.send(None).await.ok();
 280                            this.update(cx, |this, cx| {
 281                                this.accepted_tos_at = None;
 282                                cx.emit(Event::PrivateUserInfoUpdated);
 283                                cx.notify();
 284                                this.clear_contacts()
 285                            })?
 286                            .await;
 287                        }
 288                        Status::ConnectionLost => {
 289                            this.update(cx, |this, cx| {
 290                                cx.notify();
 291                                this.clear_contacts()
 292                            })?
 293                            .await;
 294                        }
 295                        _ => {}
 296                    }
 297                }
 298                Ok(())
 299            }),
 300            pending_contact_requests: Default::default(),
 301            weak_self: cx.weak_entity(),
 302        }
 303    }
 304
 305    #[cfg(feature = "test-support")]
 306    pub fn clear_cache(&mut self) {
 307        self.users.clear();
 308        self.by_github_login.clear();
 309    }
 310
 311    async fn handle_update_invite_info(
 312        this: Entity<Self>,
 313        message: TypedEnvelope<proto::UpdateInviteInfo>,
 314        mut cx: AsyncApp,
 315    ) -> Result<()> {
 316        this.update(&mut cx, |this, cx| {
 317            this.invite_info = Some(InviteInfo {
 318                url: Arc::from(message.payload.url),
 319                count: message.payload.count,
 320            });
 321            cx.notify();
 322        })?;
 323        Ok(())
 324    }
 325
 326    async fn handle_show_contacts(
 327        this: Entity<Self>,
 328        _: TypedEnvelope<proto::ShowContacts>,
 329        mut cx: AsyncApp,
 330    ) -> Result<()> {
 331        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
 332        Ok(())
 333    }
 334
 335    pub fn invite_info(&self) -> Option<&InviteInfo> {
 336        self.invite_info.as_ref()
 337    }
 338
 339    async fn handle_update_contacts(
 340        this: Entity<Self>,
 341        message: TypedEnvelope<proto::UpdateContacts>,
 342        mut cx: AsyncApp,
 343    ) -> Result<()> {
 344        this.read_with(&mut cx, |this, _| {
 345            this.update_contacts_tx
 346                .unbounded_send(UpdateContacts::Update(message.payload))
 347                .unwrap();
 348        })?;
 349        Ok(())
 350    }
 351
 352    async fn handle_update_plan(
 353        this: Entity<Self>,
 354        message: TypedEnvelope<proto::UpdateUserPlan>,
 355        mut cx: AsyncApp,
 356    ) -> Result<()> {
 357        this.update(&mut cx, |this, cx| {
 358            this.current_plan = Some(message.payload.plan());
 359            this.subscription_period = maybe!({
 360                let period = message.payload.subscription_period?;
 361                let started_at = DateTime::from_timestamp(period.started_at as i64, 0)?;
 362                let ended_at = DateTime::from_timestamp(period.ended_at as i64, 0)?;
 363
 364                Some((started_at, ended_at))
 365            });
 366            this.trial_started_at = message
 367                .payload
 368                .trial_started_at
 369                .and_then(|trial_started_at| DateTime::from_timestamp(trial_started_at as i64, 0));
 370            this.is_usage_based_billing_enabled = message.payload.is_usage_based_billing_enabled;
 371            this.account_too_young = message.payload.account_too_young;
 372            this.has_overdue_invoices = message.payload.has_overdue_invoices;
 373
 374            if let Some(usage) = message.payload.usage {
 375                // limits are always present even though they are wrapped in Option
 376                this.model_request_usage = usage
 377                    .model_requests_usage_limit
 378                    .and_then(|limit| {
 379                        RequestUsage::from_proto(usage.model_requests_usage_amount, limit)
 380                    })
 381                    .map(ModelRequestUsage);
 382            }
 383
 384            cx.emit(Event::PlanUpdated);
 385            cx.notify();
 386        })?;
 387        Ok(())
 388    }
 389
 390    pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
 391        self.model_request_usage = Some(usage);
 392        cx.notify();
 393    }
 394
 395    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
 396        match message {
 397            UpdateContacts::Wait(barrier) => {
 398                drop(barrier);
 399                Task::ready(Ok(()))
 400            }
 401            UpdateContacts::Clear(barrier) => {
 402                self.contacts.clear();
 403                self.incoming_contact_requests.clear();
 404                self.outgoing_contact_requests.clear();
 405                drop(barrier);
 406                Task::ready(Ok(()))
 407            }
 408            UpdateContacts::Update(message) => {
 409                let mut user_ids = HashSet::default();
 410                for contact in &message.contacts {
 411                    user_ids.insert(contact.user_id);
 412                }
 413                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
 414                user_ids.extend(message.outgoing_requests.iter());
 415
 416                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
 417                cx.spawn(async move |this, cx| {
 418                    load_users.await?;
 419
 420                    // Users are fetched in parallel above and cached in call to get_users
 421                    // No need to parallelize here
 422                    let mut updated_contacts = Vec::new();
 423                    let this = this.upgrade().context("can't upgrade user store handle")?;
 424                    for contact in message.contacts {
 425                        updated_contacts
 426                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
 427                    }
 428
 429                    let mut incoming_requests = Vec::new();
 430                    for request in message.incoming_requests {
 431                        incoming_requests.push({
 432                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
 433                                .await?
 434                        });
 435                    }
 436
 437                    let mut outgoing_requests = Vec::new();
 438                    for requested_user_id in message.outgoing_requests {
 439                        outgoing_requests.push(
 440                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
 441                                .await?,
 442                        );
 443                    }
 444
 445                    let removed_contacts =
 446                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
 447                    let removed_incoming_requests =
 448                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
 449                    let removed_outgoing_requests =
 450                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
 451
 452                    this.update(cx, |this, cx| {
 453                        // Remove contacts
 454                        this.contacts
 455                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
 456                        // Update existing contacts and insert new ones
 457                        for updated_contact in updated_contacts {
 458                            match this.contacts.binary_search_by_key(
 459                                &&updated_contact.user.github_login,
 460                                |contact| &contact.user.github_login,
 461                            ) {
 462                                Ok(ix) => this.contacts[ix] = updated_contact,
 463                                Err(ix) => this.contacts.insert(ix, updated_contact),
 464                            }
 465                        }
 466
 467                        // Remove incoming contact requests
 468                        this.incoming_contact_requests.retain(|user| {
 469                            if removed_incoming_requests.contains(&user.id) {
 470                                cx.emit(Event::Contact {
 471                                    user: user.clone(),
 472                                    kind: ContactEventKind::Cancelled,
 473                                });
 474                                false
 475                            } else {
 476                                true
 477                            }
 478                        });
 479                        // Update existing incoming requests and insert new ones
 480                        for user in incoming_requests {
 481                            match this
 482                                .incoming_contact_requests
 483                                .binary_search_by_key(&&user.github_login, |contact| {
 484                                    &contact.github_login
 485                                }) {
 486                                Ok(ix) => this.incoming_contact_requests[ix] = user,
 487                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
 488                            }
 489                        }
 490
 491                        // Remove outgoing contact requests
 492                        this.outgoing_contact_requests
 493                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
 494                        // Update existing incoming requests and insert new ones
 495                        for request in outgoing_requests {
 496                            match this
 497                                .outgoing_contact_requests
 498                                .binary_search_by_key(&&request.github_login, |contact| {
 499                                    &contact.github_login
 500                                }) {
 501                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
 502                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
 503                            }
 504                        }
 505
 506                        cx.notify();
 507                    })?;
 508
 509                    Ok(())
 510                })
 511            }
 512        }
 513    }
 514
 515    pub fn contacts(&self) -> &[Arc<Contact>] {
 516        &self.contacts
 517    }
 518
 519    pub fn has_contact(&self, user: &Arc<User>) -> bool {
 520        self.contacts
 521            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 522            .is_ok()
 523    }
 524
 525    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
 526        &self.incoming_contact_requests
 527    }
 528
 529    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
 530        &self.outgoing_contact_requests
 531    }
 532
 533    pub fn is_contact_request_pending(&self, user: &User) -> bool {
 534        self.pending_contact_requests.contains_key(&user.id)
 535    }
 536
 537    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
 538        if self
 539            .contacts
 540            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 541            .is_ok()
 542        {
 543            ContactRequestStatus::RequestAccepted
 544        } else if self
 545            .outgoing_contact_requests
 546            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 547            .is_ok()
 548        {
 549            ContactRequestStatus::RequestSent
 550        } else if self
 551            .incoming_contact_requests
 552            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 553            .is_ok()
 554        {
 555            ContactRequestStatus::RequestReceived
 556        } else {
 557            ContactRequestStatus::None
 558        }
 559    }
 560
 561    pub fn request_contact(
 562        &mut self,
 563        responder_id: u64,
 564        cx: &mut Context<Self>,
 565    ) -> Task<Result<()>> {
 566        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
 567    }
 568
 569    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
 570        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
 571    }
 572
 573    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
 574        self.incoming_contact_requests
 575            .iter()
 576            .any(|user| user.id == user_id)
 577    }
 578
 579    pub fn respond_to_contact_request(
 580        &mut self,
 581        requester_id: u64,
 582        accept: bool,
 583        cx: &mut Context<Self>,
 584    ) -> Task<Result<()>> {
 585        self.perform_contact_request(
 586            requester_id,
 587            proto::RespondToContactRequest {
 588                requester_id,
 589                response: if accept {
 590                    proto::ContactRequestResponse::Accept
 591                } else {
 592                    proto::ContactRequestResponse::Decline
 593                } as i32,
 594            },
 595            cx,
 596        )
 597    }
 598
 599    pub fn dismiss_contact_request(
 600        &self,
 601        requester_id: u64,
 602        cx: &Context<Self>,
 603    ) -> Task<Result<()>> {
 604        let client = self.client.upgrade();
 605        cx.spawn(async move |_, _| {
 606            client
 607                .context("can't upgrade client reference")?
 608                .request(proto::RespondToContactRequest {
 609                    requester_id,
 610                    response: proto::ContactRequestResponse::Dismiss as i32,
 611                })
 612                .await?;
 613            Ok(())
 614        })
 615    }
 616
 617    fn perform_contact_request<T: RequestMessage>(
 618        &mut self,
 619        user_id: u64,
 620        request: T,
 621        cx: &mut Context<Self>,
 622    ) -> Task<Result<()>> {
 623        let client = self.client.upgrade();
 624        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
 625        cx.notify();
 626
 627        cx.spawn(async move |this, cx| {
 628            let response = client
 629                .context("can't upgrade client reference")?
 630                .request(request)
 631                .await;
 632            this.update(cx, |this, cx| {
 633                if let Entry::Occupied(mut request_count) =
 634                    this.pending_contact_requests.entry(user_id)
 635                {
 636                    *request_count.get_mut() -= 1;
 637                    if *request_count.get() == 0 {
 638                        request_count.remove();
 639                    }
 640                }
 641                cx.notify();
 642            })?;
 643            response?;
 644            Ok(())
 645        })
 646    }
 647
 648    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
 649        let (tx, mut rx) = postage::barrier::channel();
 650        self.update_contacts_tx
 651            .unbounded_send(UpdateContacts::Clear(tx))
 652            .unwrap();
 653        async move {
 654            rx.next().await;
 655        }
 656    }
 657
 658    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
 659        let (tx, mut rx) = postage::barrier::channel();
 660        self.update_contacts_tx
 661            .unbounded_send(UpdateContacts::Wait(tx))
 662            .unwrap();
 663        async move {
 664            rx.next().await;
 665        }
 666    }
 667
 668    pub fn get_users(
 669        &self,
 670        user_ids: Vec<u64>,
 671        cx: &Context<Self>,
 672    ) -> Task<Result<Vec<Arc<User>>>> {
 673        let mut user_ids_to_fetch = user_ids.clone();
 674        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
 675
 676        cx.spawn(async move |this, cx| {
 677            if !user_ids_to_fetch.is_empty() {
 678                this.update(cx, |this, cx| {
 679                    this.load_users(
 680                        proto::GetUsers {
 681                            user_ids: user_ids_to_fetch,
 682                        },
 683                        cx,
 684                    )
 685                })?
 686                .await?;
 687            }
 688
 689            this.read_with(cx, |this, _| {
 690                user_ids
 691                    .iter()
 692                    .map(|user_id| {
 693                        this.users
 694                            .get(user_id)
 695                            .cloned()
 696                            .with_context(|| format!("user {user_id} not found"))
 697                    })
 698                    .collect()
 699            })?
 700        })
 701    }
 702
 703    pub fn fuzzy_search_users(
 704        &self,
 705        query: String,
 706        cx: &Context<Self>,
 707    ) -> Task<Result<Vec<Arc<User>>>> {
 708        self.load_users(proto::FuzzySearchUsers { query }, cx)
 709    }
 710
 711    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
 712        self.users.get(&user_id).cloned()
 713    }
 714
 715    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
 716        if let Some(user) = self.users.get(&user_id).cloned() {
 717            return Some(user);
 718        }
 719
 720        self.get_user(user_id, cx).detach_and_log_err(cx);
 721        None
 722    }
 723
 724    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
 725        if let Some(user) = self.users.get(&user_id).cloned() {
 726            return Task::ready(Ok(user));
 727        }
 728
 729        let load_users = self.get_users(vec![user_id], cx);
 730        cx.spawn(async move |this, cx| {
 731            load_users.await?;
 732            this.read_with(cx, |this, _| {
 733                this.users
 734                    .get(&user_id)
 735                    .cloned()
 736                    .context("server responded with no users")
 737            })?
 738        })
 739    }
 740
 741    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
 742        self.by_github_login
 743            .get(github_login)
 744            .and_then(|id| self.users.get(id).cloned())
 745    }
 746
 747    pub fn current_user(&self) -> Option<Arc<User>> {
 748        self.current_user.borrow().clone()
 749    }
 750
 751    pub fn current_plan(&self) -> Option<proto::Plan> {
 752        #[cfg(debug_assertions)]
 753        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
 754            return match plan.as_str() {
 755                "free" => Some(proto::Plan::Free),
 756                "trial" => Some(proto::Plan::ZedProTrial),
 757                "pro" => Some(proto::Plan::ZedPro),
 758                _ => {
 759                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
 760                }
 761            };
 762        }
 763
 764        self.current_plan
 765    }
 766
 767    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 768        self.subscription_period
 769    }
 770
 771    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
 772        self.trial_started_at
 773    }
 774
 775    pub fn usage_based_billing_enabled(&self) -> Option<bool> {
 776        self.is_usage_based_billing_enabled
 777    }
 778
 779    pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
 780        self.model_request_usage
 781    }
 782
 783    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
 784        self.current_user.clone()
 785    }
 786
 787    /// Returns whether the user's account is too new to use the service.
 788    pub fn account_too_young(&self) -> bool {
 789        self.account_too_young.unwrap_or(false)
 790    }
 791
 792    /// Returns whether the current user has overdue invoices and usage should be blocked.
 793    pub fn has_overdue_invoices(&self) -> bool {
 794        self.has_overdue_invoices.unwrap_or(false)
 795    }
 796
 797    pub fn current_user_has_accepted_terms(&self) -> Option<bool> {
 798        self.accepted_tos_at
 799            .map(|accepted_tos_at| accepted_tos_at.is_some())
 800    }
 801
 802    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
 803        if self.current_user().is_none() {
 804            return Task::ready(Err(anyhow!("no current user")));
 805        };
 806
 807        let client = self.client.clone();
 808        cx.spawn(async move |this, cx| -> anyhow::Result<()> {
 809            let client = client.upgrade().context("client not found")?;
 810            let response = client
 811                .request(proto::AcceptTermsOfService {})
 812                .await
 813                .context("error accepting tos")?;
 814            this.update(cx, |this, cx| {
 815                this.set_current_user_accepted_tos_at(Some(response.accepted_tos_at));
 816                cx.emit(Event::PrivateUserInfoUpdated);
 817            })?;
 818            Ok(())
 819        })
 820    }
 821
 822    fn set_current_user_accepted_tos_at(&mut self, accepted_tos_at: Option<u64>) {
 823        self.accepted_tos_at = Some(
 824            accepted_tos_at.and_then(|timestamp| DateTime::from_timestamp(timestamp as i64, 0)),
 825        );
 826    }
 827
 828    fn load_users(
 829        &self,
 830        request: impl RequestMessage<Response = UsersResponse>,
 831        cx: &Context<Self>,
 832    ) -> Task<Result<Vec<Arc<User>>>> {
 833        let client = self.client.clone();
 834        cx.spawn(async move |this, cx| {
 835            if let Some(rpc) = client.upgrade() {
 836                let response = rpc.request(request).await.context("error loading users")?;
 837                let users = response.users;
 838
 839                this.update(cx, |this, _| this.insert(users))
 840            } else {
 841                Ok(Vec::new())
 842            }
 843        })
 844    }
 845
 846    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
 847        let mut ret = Vec::with_capacity(users.len());
 848        for user in users {
 849            let user = User::new(user);
 850            if let Some(old) = self.users.insert(user.id, user.clone()) {
 851                if old.github_login != user.github_login {
 852                    self.by_github_login.remove(&old.github_login);
 853                }
 854            }
 855            self.by_github_login
 856                .insert(user.github_login.clone(), user.id);
 857            ret.push(user)
 858        }
 859        ret
 860    }
 861
 862    pub fn set_participant_indices(
 863        &mut self,
 864        participant_indices: HashMap<u64, ParticipantIndex>,
 865        cx: &mut Context<Self>,
 866    ) {
 867        if participant_indices != self.participant_indices {
 868            self.participant_indices = participant_indices;
 869            cx.emit(Event::ParticipantIndicesChanged);
 870        }
 871    }
 872
 873    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
 874        &self.participant_indices
 875    }
 876
 877    pub fn participant_names(
 878        &self,
 879        user_ids: impl Iterator<Item = u64>,
 880        cx: &App,
 881    ) -> HashMap<u64, SharedString> {
 882        let mut ret = HashMap::default();
 883        let mut missing_user_ids = Vec::new();
 884        for id in user_ids {
 885            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
 886                ret.insert(id, github_login);
 887            } else {
 888                missing_user_ids.push(id)
 889            }
 890        }
 891        if !missing_user_ids.is_empty() {
 892            let this = self.weak_self.clone();
 893            cx.spawn(async move |cx| {
 894                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
 895                    .await
 896            })
 897            .detach_and_log_err(cx);
 898        }
 899        ret
 900    }
 901}
 902
 903impl User {
 904    fn new(message: proto::User) -> Arc<Self> {
 905        Arc::new(User {
 906            id: message.id,
 907            github_login: message.github_login.into(),
 908            avatar_uri: message.avatar_url.into(),
 909            name: message.name,
 910        })
 911    }
 912}
 913
 914impl Contact {
 915    async fn from_proto(
 916        contact: proto::Contact,
 917        user_store: &Entity<UserStore>,
 918        cx: &mut AsyncApp,
 919    ) -> Result<Self> {
 920        let user = user_store
 921            .update(cx, |user_store, cx| {
 922                user_store.get_user(contact.user_id, cx)
 923            })?
 924            .await?;
 925        Ok(Self {
 926            user,
 927            online: contact.online,
 928            busy: contact.busy,
 929        })
 930    }
 931}
 932
 933impl Collaborator {
 934    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
 935        Ok(Self {
 936            peer_id: message.peer_id.context("invalid peer id")?,
 937            replica_id: message.replica_id as ReplicaId,
 938            user_id: message.user_id as UserId,
 939            is_host: message.is_host,
 940            committer_name: message.committer_name,
 941            committer_email: message.committer_email,
 942        })
 943    }
 944}
 945
 946impl RequestUsage {
 947    pub fn over_limit(&self) -> bool {
 948        match self.limit {
 949            UsageLimit::Limited(limit) => self.amount >= limit,
 950            UsageLimit::Unlimited => false,
 951        }
 952    }
 953
 954    pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
 955        let limit = match limit.variant? {
 956            proto::usage_limit::Variant::Limited(limited) => {
 957                UsageLimit::Limited(limited.limit as i32)
 958            }
 959            proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
 960        };
 961        Some(RequestUsage {
 962            limit,
 963            amount: amount as i32,
 964        })
 965    }
 966
 967    fn from_headers(
 968        limit_name: &str,
 969        amount_name: &str,
 970        headers: &HeaderMap<HeaderValue>,
 971    ) -> Result<Self> {
 972        let limit = headers
 973            .get(limit_name)
 974            .with_context(|| format!("missing {limit_name:?} header"))?;
 975        let limit = UsageLimit::from_str(limit.to_str()?)?;
 976
 977        let amount = headers
 978            .get(amount_name)
 979            .with_context(|| format!("missing {amount_name:?} header"))?;
 980        let amount = amount.to_str()?.parse::<i32>()?;
 981
 982        Ok(Self { limit, amount })
 983    }
 984}
 985
 986impl ModelRequestUsage {
 987    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
 988        Ok(Self(RequestUsage::from_headers(
 989            MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
 990            MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
 991            headers,
 992        )?))
 993    }
 994}
 995
 996impl EditPredictionUsage {
 997    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
 998        Ok(Self(RequestUsage::from_headers(
 999            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1000            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1001            headers,
1002        )?))
1003    }
1004}