user.rs

   1use super::{Client, Status, TypedEnvelope, proto};
   2use anyhow::{Context as _, Result};
   3use chrono::{DateTime, Utc};
   4use cloud_api_client::websocket_protocol::MessageToClient;
   5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
   6use cloud_llm_client::{
   7    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
   8    MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, Plan,
   9    UsageLimit,
  10};
  11use collections::{HashMap, HashSet, hash_map::Entry};
  12use derive_more::Deref;
  13use feature_flags::FeatureFlagAppExt;
  14use futures::{Future, StreamExt, channel::mpsc};
  15use gpui::{
  16    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  17};
  18use http_client::http::{HeaderMap, HeaderValue};
  19use postage::{sink::Sink, watch};
  20use rpc::proto::{RequestMessage, UsersResponse};
  21use std::{
  22    str::FromStr as _,
  23    sync::{Arc, Weak},
  24};
  25use text::ReplicaId;
  26use util::{ResultExt, TryFutureExt as _};
  27
  28pub type UserId = u64;
  29
  30#[derive(
  31    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  32)]
  33pub struct ChannelId(pub u64);
  34
  35impl std::fmt::Display for ChannelId {
  36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  37        self.0.fmt(f)
  38    }
  39}
  40
  41#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
  42pub struct ProjectId(pub u64);
  43
  44impl ProjectId {
  45    pub const fn to_proto(self) -> u64 {
  46        self.0
  47    }
  48}
  49
  50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  51pub struct ParticipantIndex(pub u32);
  52
  53#[derive(Default, Debug)]
  54pub struct User {
  55    pub id: UserId,
  56    pub github_login: SharedString,
  57    pub avatar_uri: SharedUri,
  58    pub name: Option<String>,
  59}
  60
  61#[derive(Clone, Debug, PartialEq, Eq)]
  62pub struct Collaborator {
  63    pub peer_id: proto::PeerId,
  64    pub replica_id: ReplicaId,
  65    pub user_id: UserId,
  66    pub is_host: bool,
  67    pub committer_name: Option<String>,
  68    pub committer_email: Option<String>,
  69}
  70
  71impl PartialOrd for User {
  72    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
  73        Some(self.cmp(other))
  74    }
  75}
  76
  77impl Ord for User {
  78    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
  79        self.github_login.cmp(&other.github_login)
  80    }
  81}
  82
  83impl PartialEq for User {
  84    fn eq(&self, other: &Self) -> bool {
  85        self.id == other.id && self.github_login == other.github_login
  86    }
  87}
  88
  89impl Eq for User {}
  90
  91#[derive(Debug, PartialEq)]
  92pub struct Contact {
  93    pub user: Arc<User>,
  94    pub online: bool,
  95    pub busy: bool,
  96}
  97
  98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  99pub enum ContactRequestStatus {
 100    None,
 101    RequestSent,
 102    RequestReceived,
 103    RequestAccepted,
 104}
 105
 106pub struct UserStore {
 107    users: HashMap<u64, Arc<User>>,
 108    by_github_login: HashMap<SharedString, u64>,
 109    participant_indices: HashMap<u64, ParticipantIndex>,
 110    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 111    model_request_usage: Option<ModelRequestUsage>,
 112    edit_prediction_usage: Option<EditPredictionUsage>,
 113    plan_info: Option<PlanInfo>,
 114    current_user: watch::Receiver<Option<Arc<User>>>,
 115    contacts: Vec<Arc<Contact>>,
 116    incoming_contact_requests: Vec<Arc<User>>,
 117    outgoing_contact_requests: Vec<Arc<User>>,
 118    pending_contact_requests: HashMap<u64, usize>,
 119    invite_info: Option<InviteInfo>,
 120    client: Weak<Client>,
 121    _maintain_contacts: Task<()>,
 122    _maintain_current_user: Task<Result<()>>,
 123    weak_self: WeakEntity<Self>,
 124}
 125
 126#[derive(Clone)]
 127pub struct InviteInfo {
 128    pub count: u32,
 129    pub url: Arc<str>,
 130}
 131
 132pub enum Event {
 133    Contact {
 134        user: Arc<User>,
 135        kind: ContactEventKind,
 136    },
 137    ShowContacts,
 138    ParticipantIndicesChanged,
 139    PrivateUserInfoUpdated,
 140    PlanUpdated,
 141}
 142
 143#[derive(Clone, Copy)]
 144pub enum ContactEventKind {
 145    Requested,
 146    Accepted,
 147    Cancelled,
 148}
 149
 150impl EventEmitter<Event> for UserStore {}
 151
 152enum UpdateContacts {
 153    Update(proto::UpdateContacts),
 154    Wait(postage::barrier::Sender),
 155    Clear(postage::barrier::Sender),
 156}
 157
 158#[derive(Debug, Clone, Copy, Deref)]
 159pub struct ModelRequestUsage(pub RequestUsage);
 160
 161#[derive(Debug, Clone, Copy, Deref)]
 162pub struct EditPredictionUsage(pub RequestUsage);
 163
 164#[derive(Debug, Clone, Copy)]
 165pub struct RequestUsage {
 166    pub limit: UsageLimit,
 167    pub amount: i32,
 168}
 169
 170impl UserStore {
 171    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
 172        let (mut current_user_tx, current_user_rx) = watch::channel();
 173        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 174        let rpc_subscriptions = vec![
 175            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
 176            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
 177            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
 178        ];
 179
 180        client.add_message_to_client_handler({
 181            let this = cx.weak_entity();
 182            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
 183        });
 184
 185        Self {
 186            users: Default::default(),
 187            by_github_login: Default::default(),
 188            current_user: current_user_rx,
 189            plan_info: None,
 190            model_request_usage: None,
 191            edit_prediction_usage: None,
 192            contacts: Default::default(),
 193            incoming_contact_requests: Default::default(),
 194            participant_indices: Default::default(),
 195            outgoing_contact_requests: Default::default(),
 196            invite_info: None,
 197            client: Arc::downgrade(&client),
 198            update_contacts_tx,
 199            _maintain_contacts: cx.spawn(async move |this, cx| {
 200                let _subscriptions = rpc_subscriptions;
 201                while let Some(message) = update_contacts_rx.next().await {
 202                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
 203                    {
 204                        task.log_err().await;
 205                    } else {
 206                        break;
 207                    }
 208                }
 209            }),
 210            _maintain_current_user: cx.spawn(async move |this, cx| {
 211                let mut status = client.status();
 212                let weak = Arc::downgrade(&client);
 213                drop(client);
 214                while let Some(status) = status.next().await {
 215                    // if the client is dropped, the app is shutting down.
 216                    let Some(client) = weak.upgrade() else {
 217                        return Ok(());
 218                    };
 219                    match status {
 220                        Status::Authenticated
 221                        | Status::Reauthenticated
 222                        | Status::Connected { .. } => {
 223                            if let Some(user_id) = client.user_id() {
 224                                let response = client
 225                                    .cloud_client()
 226                                    .get_authenticated_user()
 227                                    .await
 228                                    .log_err();
 229
 230                                let current_user_and_response = if let Some(response) = response {
 231                                    let user = Arc::new(User {
 232                                        id: user_id,
 233                                        github_login: response.user.github_login.clone().into(),
 234                                        avatar_uri: response.user.avatar_url.clone().into(),
 235                                        name: response.user.name.clone(),
 236                                    });
 237
 238                                    Some((user, response))
 239                                } else {
 240                                    None
 241                                };
 242                                current_user_tx
 243                                    .send(
 244                                        current_user_and_response
 245                                            .as_ref()
 246                                            .map(|(user, _)| user.clone()),
 247                                    )
 248                                    .await
 249                                    .ok();
 250
 251                                cx.update(|cx| {
 252                                    if let Some((user, response)) = current_user_and_response {
 253                                        this.update(cx, |this, cx| {
 254                                            this.by_github_login
 255                                                .insert(user.github_login.clone(), user_id);
 256                                            this.users.insert(user_id, user);
 257                                            this.update_authenticated_user(response, cx)
 258                                        })
 259                                    } else {
 260                                        anyhow::Ok(())
 261                                    }
 262                                })??;
 263
 264                                this.update(cx, |_, cx| cx.notify())?;
 265                            }
 266                        }
 267                        Status::SignedOut => {
 268                            current_user_tx.send(None).await.ok();
 269                            this.update(cx, |this, cx| {
 270                                cx.emit(Event::PrivateUserInfoUpdated);
 271                                cx.notify();
 272                                this.clear_contacts()
 273                            })?
 274                            .await;
 275                        }
 276                        Status::ConnectionLost => {
 277                            this.update(cx, |this, cx| {
 278                                cx.notify();
 279                                this.clear_contacts()
 280                            })?
 281                            .await;
 282                        }
 283                        _ => {}
 284                    }
 285                }
 286                Ok(())
 287            }),
 288            pending_contact_requests: Default::default(),
 289            weak_self: cx.weak_entity(),
 290        }
 291    }
 292
 293    #[cfg(feature = "test-support")]
 294    pub fn clear_cache(&mut self) {
 295        self.users.clear();
 296        self.by_github_login.clear();
 297    }
 298
 299    async fn handle_update_invite_info(
 300        this: Entity<Self>,
 301        message: TypedEnvelope<proto::UpdateInviteInfo>,
 302        mut cx: AsyncApp,
 303    ) -> Result<()> {
 304        this.update(&mut cx, |this, cx| {
 305            this.invite_info = Some(InviteInfo {
 306                url: Arc::from(message.payload.url),
 307                count: message.payload.count,
 308            });
 309            cx.notify();
 310        })?;
 311        Ok(())
 312    }
 313
 314    async fn handle_show_contacts(
 315        this: Entity<Self>,
 316        _: TypedEnvelope<proto::ShowContacts>,
 317        mut cx: AsyncApp,
 318    ) -> Result<()> {
 319        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
 320        Ok(())
 321    }
 322
 323    pub const fn invite_info(&self) -> Option<&InviteInfo> {
 324        self.invite_info.as_ref()
 325    }
 326
 327    async fn handle_update_contacts(
 328        this: Entity<Self>,
 329        message: TypedEnvelope<proto::UpdateContacts>,
 330        cx: AsyncApp,
 331    ) -> Result<()> {
 332        this.read_with(&cx, |this, _| {
 333            this.update_contacts_tx
 334                .unbounded_send(UpdateContacts::Update(message.payload))
 335                .unwrap();
 336        })?;
 337        Ok(())
 338    }
 339
 340    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
 341        match message {
 342            UpdateContacts::Wait(barrier) => {
 343                drop(barrier);
 344                Task::ready(Ok(()))
 345            }
 346            UpdateContacts::Clear(barrier) => {
 347                self.contacts.clear();
 348                self.incoming_contact_requests.clear();
 349                self.outgoing_contact_requests.clear();
 350                drop(barrier);
 351                Task::ready(Ok(()))
 352            }
 353            UpdateContacts::Update(message) => {
 354                let mut user_ids = HashSet::default();
 355                for contact in &message.contacts {
 356                    user_ids.insert(contact.user_id);
 357                }
 358                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
 359                user_ids.extend(message.outgoing_requests.iter());
 360
 361                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
 362                cx.spawn(async move |this, cx| {
 363                    load_users.await?;
 364
 365                    // Users are fetched in parallel above and cached in call to get_users
 366                    // No need to parallelize here
 367                    let mut updated_contacts = Vec::new();
 368                    let this = this.upgrade().context("can't upgrade user store handle")?;
 369                    for contact in message.contacts {
 370                        updated_contacts
 371                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
 372                    }
 373
 374                    let mut incoming_requests = Vec::new();
 375                    for request in message.incoming_requests {
 376                        incoming_requests.push({
 377                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
 378                                .await?
 379                        });
 380                    }
 381
 382                    let mut outgoing_requests = Vec::new();
 383                    for requested_user_id in message.outgoing_requests {
 384                        outgoing_requests.push(
 385                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
 386                                .await?,
 387                        );
 388                    }
 389
 390                    let removed_contacts =
 391                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
 392                    let removed_incoming_requests =
 393                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
 394                    let removed_outgoing_requests =
 395                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
 396
 397                    this.update(cx, |this, cx| {
 398                        // Remove contacts
 399                        this.contacts
 400                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
 401                        // Update existing contacts and insert new ones
 402                        for updated_contact in updated_contacts {
 403                            match this.contacts.binary_search_by_key(
 404                                &&updated_contact.user.github_login,
 405                                |contact| &contact.user.github_login,
 406                            ) {
 407                                Ok(ix) => this.contacts[ix] = updated_contact,
 408                                Err(ix) => this.contacts.insert(ix, updated_contact),
 409                            }
 410                        }
 411
 412                        // Remove incoming contact requests
 413                        this.incoming_contact_requests.retain(|user| {
 414                            if removed_incoming_requests.contains(&user.id) {
 415                                cx.emit(Event::Contact {
 416                                    user: user.clone(),
 417                                    kind: ContactEventKind::Cancelled,
 418                                });
 419                                false
 420                            } else {
 421                                true
 422                            }
 423                        });
 424                        // Update existing incoming requests and insert new ones
 425                        for user in incoming_requests {
 426                            match this
 427                                .incoming_contact_requests
 428                                .binary_search_by_key(&&user.github_login, |contact| {
 429                                    &contact.github_login
 430                                }) {
 431                                Ok(ix) => this.incoming_contact_requests[ix] = user,
 432                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
 433                            }
 434                        }
 435
 436                        // Remove outgoing contact requests
 437                        this.outgoing_contact_requests
 438                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
 439                        // Update existing incoming requests and insert new ones
 440                        for request in outgoing_requests {
 441                            match this
 442                                .outgoing_contact_requests
 443                                .binary_search_by_key(&&request.github_login, |contact| {
 444                                    &contact.github_login
 445                                }) {
 446                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
 447                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
 448                            }
 449                        }
 450
 451                        cx.notify();
 452                    })?;
 453
 454                    Ok(())
 455                })
 456            }
 457        }
 458    }
 459
 460    pub fn contacts(&self) -> &[Arc<Contact>] {
 461        &self.contacts
 462    }
 463
 464    pub fn has_contact(&self, user: &Arc<User>) -> bool {
 465        self.contacts
 466            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 467            .is_ok()
 468    }
 469
 470    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
 471        &self.incoming_contact_requests
 472    }
 473
 474    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
 475        &self.outgoing_contact_requests
 476    }
 477
 478    pub fn is_contact_request_pending(&self, user: &User) -> bool {
 479        self.pending_contact_requests.contains_key(&user.id)
 480    }
 481
 482    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
 483        if self
 484            .contacts
 485            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 486            .is_ok()
 487        {
 488            ContactRequestStatus::RequestAccepted
 489        } else if self
 490            .outgoing_contact_requests
 491            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 492            .is_ok()
 493        {
 494            ContactRequestStatus::RequestSent
 495        } else if self
 496            .incoming_contact_requests
 497            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 498            .is_ok()
 499        {
 500            ContactRequestStatus::RequestReceived
 501        } else {
 502            ContactRequestStatus::None
 503        }
 504    }
 505
 506    pub fn request_contact(
 507        &mut self,
 508        responder_id: u64,
 509        cx: &mut Context<Self>,
 510    ) -> Task<Result<()>> {
 511        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
 512    }
 513
 514    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
 515        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
 516    }
 517
 518    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
 519        self.incoming_contact_requests
 520            .iter()
 521            .any(|user| user.id == user_id)
 522    }
 523
 524    pub fn respond_to_contact_request(
 525        &mut self,
 526        requester_id: u64,
 527        accept: bool,
 528        cx: &mut Context<Self>,
 529    ) -> Task<Result<()>> {
 530        self.perform_contact_request(
 531            requester_id,
 532            proto::RespondToContactRequest {
 533                requester_id,
 534                response: if accept {
 535                    proto::ContactRequestResponse::Accept
 536                } else {
 537                    proto::ContactRequestResponse::Decline
 538                } as i32,
 539            },
 540            cx,
 541        )
 542    }
 543
 544    pub fn dismiss_contact_request(
 545        &self,
 546        requester_id: u64,
 547        cx: &Context<Self>,
 548    ) -> Task<Result<()>> {
 549        let client = self.client.upgrade();
 550        cx.spawn(async move |_, _| {
 551            client
 552                .context("can't upgrade client reference")?
 553                .request(proto::RespondToContactRequest {
 554                    requester_id,
 555                    response: proto::ContactRequestResponse::Dismiss as i32,
 556                })
 557                .await?;
 558            Ok(())
 559        })
 560    }
 561
 562    fn perform_contact_request<T: RequestMessage>(
 563        &mut self,
 564        user_id: u64,
 565        request: T,
 566        cx: &mut Context<Self>,
 567    ) -> Task<Result<()>> {
 568        let client = self.client.upgrade();
 569        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
 570        cx.notify();
 571
 572        cx.spawn(async move |this, cx| {
 573            let response = client
 574                .context("can't upgrade client reference")?
 575                .request(request)
 576                .await;
 577            this.update(cx, |this, cx| {
 578                if let Entry::Occupied(mut request_count) =
 579                    this.pending_contact_requests.entry(user_id)
 580                {
 581                    *request_count.get_mut() -= 1;
 582                    if *request_count.get() == 0 {
 583                        request_count.remove();
 584                    }
 585                }
 586                cx.notify();
 587            })?;
 588            response?;
 589            Ok(())
 590        })
 591    }
 592
 593    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
 594        let (tx, mut rx) = postage::barrier::channel();
 595        self.update_contacts_tx
 596            .unbounded_send(UpdateContacts::Clear(tx))
 597            .unwrap();
 598        async move {
 599            rx.next().await;
 600        }
 601    }
 602
 603    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
 604        let (tx, mut rx) = postage::barrier::channel();
 605        self.update_contacts_tx
 606            .unbounded_send(UpdateContacts::Wait(tx))
 607            .unwrap();
 608        async move {
 609            rx.next().await;
 610        }
 611    }
 612
 613    pub fn get_users(
 614        &self,
 615        user_ids: Vec<u64>,
 616        cx: &Context<Self>,
 617    ) -> Task<Result<Vec<Arc<User>>>> {
 618        let mut user_ids_to_fetch = user_ids.clone();
 619        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
 620
 621        cx.spawn(async move |this, cx| {
 622            if !user_ids_to_fetch.is_empty() {
 623                this.update(cx, |this, cx| {
 624                    this.load_users(
 625                        proto::GetUsers {
 626                            user_ids: user_ids_to_fetch,
 627                        },
 628                        cx,
 629                    )
 630                })?
 631                .await?;
 632            }
 633
 634            this.read_with(cx, |this, _| {
 635                user_ids
 636                    .iter()
 637                    .map(|user_id| {
 638                        this.users
 639                            .get(user_id)
 640                            .cloned()
 641                            .with_context(|| format!("user {user_id} not found"))
 642                    })
 643                    .collect()
 644            })?
 645        })
 646    }
 647
 648    pub fn fuzzy_search_users(
 649        &self,
 650        query: String,
 651        cx: &Context<Self>,
 652    ) -> Task<Result<Vec<Arc<User>>>> {
 653        self.load_users(proto::FuzzySearchUsers { query }, cx)
 654    }
 655
 656    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
 657        self.users.get(&user_id).cloned()
 658    }
 659
 660    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
 661        if let Some(user) = self.users.get(&user_id).cloned() {
 662            return Some(user);
 663        }
 664
 665        self.get_user(user_id, cx).detach_and_log_err(cx);
 666        None
 667    }
 668
 669    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
 670        if let Some(user) = self.users.get(&user_id).cloned() {
 671            return Task::ready(Ok(user));
 672        }
 673
 674        let load_users = self.get_users(vec![user_id], cx);
 675        cx.spawn(async move |this, cx| {
 676            load_users.await?;
 677            this.read_with(cx, |this, _| {
 678                this.users
 679                    .get(&user_id)
 680                    .cloned()
 681                    .context("server responded with no users")
 682            })?
 683        })
 684    }
 685
 686    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
 687        self.by_github_login
 688            .get(github_login)
 689            .and_then(|id| self.users.get(id).cloned())
 690    }
 691
 692    pub fn current_user(&self) -> Option<Arc<User>> {
 693        self.current_user.borrow().clone()
 694    }
 695
 696    pub fn plan(&self) -> Option<Plan> {
 697        #[cfg(debug_assertions)]
 698        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
 699            use cloud_llm_client::PlanV1;
 700
 701            return match plan.as_str() {
 702                "free" => Some(Plan::V1(PlanV1::ZedFree)),
 703                "trial" => Some(Plan::V1(PlanV1::ZedProTrial)),
 704                "pro" => Some(Plan::V1(PlanV1::ZedPro)),
 705                _ => {
 706                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
 707                }
 708            };
 709        }
 710
 711        self.plan_info.as_ref().map(|info| info.plan())
 712    }
 713
 714    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 715        self.plan_info
 716            .as_ref()
 717            .and_then(|plan| plan.subscription_period)
 718            .map(|subscription_period| {
 719                (
 720                    subscription_period.started_at.0,
 721                    subscription_period.ended_at.0,
 722                )
 723            })
 724    }
 725
 726    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
 727        self.plan_info
 728            .as_ref()
 729            .and_then(|plan| plan.trial_started_at)
 730            .map(|trial_started_at| trial_started_at.0)
 731    }
 732
 733    /// Returns whether the user's account is too new to use the service.
 734    pub fn account_too_young(&self) -> bool {
 735        self.plan_info
 736            .as_ref()
 737            .map(|plan| plan.is_account_too_young)
 738            .unwrap_or_default()
 739    }
 740
 741    /// Returns whether the current user has overdue invoices and usage should be blocked.
 742    pub fn has_overdue_invoices(&self) -> bool {
 743        self.plan_info
 744            .as_ref()
 745            .map(|plan| plan.has_overdue_invoices)
 746            .unwrap_or_default()
 747    }
 748
 749    pub fn is_usage_based_billing_enabled(&self) -> bool {
 750        self.plan_info
 751            .as_ref()
 752            .map(|plan| plan.is_usage_based_billing_enabled)
 753            .unwrap_or_default()
 754    }
 755
 756    pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
 757        if self.plan().is_some_and(|plan| plan.is_v2()) {
 758            return None;
 759        }
 760
 761        self.model_request_usage
 762    }
 763
 764    pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
 765        self.model_request_usage = Some(usage);
 766        cx.notify();
 767    }
 768
 769    pub const fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
 770        self.edit_prediction_usage
 771    }
 772
 773    pub fn update_edit_prediction_usage(
 774        &mut self,
 775        usage: EditPredictionUsage,
 776        cx: &mut Context<Self>,
 777    ) {
 778        self.edit_prediction_usage = Some(usage);
 779        cx.notify();
 780    }
 781
 782    fn update_authenticated_user(
 783        &mut self,
 784        response: GetAuthenticatedUserResponse,
 785        cx: &mut Context<Self>,
 786    ) {
 787        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
 788        cx.update_flags(staff, response.feature_flags);
 789        if let Some(client) = self.client.upgrade() {
 790            client
 791                .telemetry
 792                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
 793        }
 794
 795        self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
 796            limit: response.plan.usage.model_requests.limit,
 797            amount: response.plan.usage.model_requests.used as i32,
 798        }));
 799        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
 800            limit: response.plan.usage.edit_predictions.limit,
 801            amount: response.plan.usage.edit_predictions.used as i32,
 802        }));
 803        self.plan_info = Some(response.plan);
 804        cx.emit(Event::PrivateUserInfoUpdated);
 805    }
 806
 807    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
 808        cx.spawn(async move |cx| {
 809            match message {
 810                MessageToClient::UserUpdated => {
 811                    let cloud_client = cx
 812                        .update(|cx| {
 813                            this.read_with(cx, |this, _cx| {
 814                                this.client.upgrade().map(|client| client.cloud_client())
 815                            })
 816                        })??
 817                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
 818
 819                    let response = cloud_client.get_authenticated_user().await?;
 820                    cx.update(|cx| {
 821                        this.update(cx, |this, cx| {
 822                            this.update_authenticated_user(response, cx);
 823                        })
 824                    })??;
 825                }
 826            }
 827
 828            anyhow::Ok(())
 829        })
 830        .detach_and_log_err(cx);
 831    }
 832
 833    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
 834        self.current_user.clone()
 835    }
 836
 837    fn load_users(
 838        &self,
 839        request: impl RequestMessage<Response = UsersResponse>,
 840        cx: &Context<Self>,
 841    ) -> Task<Result<Vec<Arc<User>>>> {
 842        let client = self.client.clone();
 843        cx.spawn(async move |this, cx| {
 844            if let Some(rpc) = client.upgrade() {
 845                let response = rpc.request(request).await.context("error loading users")?;
 846                let users = response.users;
 847
 848                this.update(cx, |this, _| this.insert(users))
 849            } else {
 850                Ok(Vec::new())
 851            }
 852        })
 853    }
 854
 855    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
 856        let mut ret = Vec::with_capacity(users.len());
 857        for user in users {
 858            let user = User::new(user);
 859            if let Some(old) = self.users.insert(user.id, user.clone())
 860                && old.github_login != user.github_login
 861            {
 862                self.by_github_login.remove(&old.github_login);
 863            }
 864            self.by_github_login
 865                .insert(user.github_login.clone(), user.id);
 866            ret.push(user)
 867        }
 868        ret
 869    }
 870
 871    pub fn set_participant_indices(
 872        &mut self,
 873        participant_indices: HashMap<u64, ParticipantIndex>,
 874        cx: &mut Context<Self>,
 875    ) {
 876        if participant_indices != self.participant_indices {
 877            self.participant_indices = participant_indices;
 878            cx.emit(Event::ParticipantIndicesChanged);
 879        }
 880    }
 881
 882    pub const fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
 883        &self.participant_indices
 884    }
 885
 886    pub fn participant_names(
 887        &self,
 888        user_ids: impl Iterator<Item = u64>,
 889        cx: &App,
 890    ) -> HashMap<u64, SharedString> {
 891        let mut ret = HashMap::default();
 892        let mut missing_user_ids = Vec::new();
 893        for id in user_ids {
 894            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
 895                ret.insert(id, github_login);
 896            } else {
 897                missing_user_ids.push(id)
 898            }
 899        }
 900        if !missing_user_ids.is_empty() {
 901            let this = self.weak_self.clone();
 902            cx.spawn(async move |cx| {
 903                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
 904                    .await
 905            })
 906            .detach_and_log_err(cx);
 907        }
 908        ret
 909    }
 910}
 911
 912impl User {
 913    fn new(message: proto::User) -> Arc<Self> {
 914        Arc::new(User {
 915            id: message.id,
 916            github_login: message.github_login.into(),
 917            avatar_uri: message.avatar_url.into(),
 918            name: message.name,
 919        })
 920    }
 921}
 922
 923impl Contact {
 924    async fn from_proto(
 925        contact: proto::Contact,
 926        user_store: &Entity<UserStore>,
 927        cx: &mut AsyncApp,
 928    ) -> Result<Self> {
 929        let user = user_store
 930            .update(cx, |user_store, cx| {
 931                user_store.get_user(contact.user_id, cx)
 932            })?
 933            .await?;
 934        Ok(Self {
 935            user,
 936            online: contact.online,
 937            busy: contact.busy,
 938        })
 939    }
 940}
 941
 942impl Collaborator {
 943    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
 944        Ok(Self {
 945            peer_id: message.peer_id.context("invalid peer id")?,
 946            replica_id: message.replica_id as ReplicaId,
 947            user_id: message.user_id as UserId,
 948            is_host: message.is_host,
 949            committer_name: message.committer_name,
 950            committer_email: message.committer_email,
 951        })
 952    }
 953}
 954
 955impl RequestUsage {
 956    pub const fn over_limit(&self) -> bool {
 957        match self.limit {
 958            UsageLimit::Limited(limit) => self.amount >= limit,
 959            UsageLimit::Unlimited => false,
 960        }
 961    }
 962
 963    fn from_headers(
 964        limit_name: &str,
 965        amount_name: &str,
 966        headers: &HeaderMap<HeaderValue>,
 967    ) -> Result<Self> {
 968        let limit = headers
 969            .get(limit_name)
 970            .with_context(|| format!("missing {limit_name:?} header"))?;
 971        let limit = UsageLimit::from_str(limit.to_str()?)?;
 972
 973        let amount = headers
 974            .get(amount_name)
 975            .with_context(|| format!("missing {amount_name:?} header"))?;
 976        let amount = amount.to_str()?.parse::<i32>()?;
 977
 978        Ok(Self { limit, amount })
 979    }
 980}
 981
 982impl ModelRequestUsage {
 983    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
 984        Ok(Self(RequestUsage::from_headers(
 985            MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
 986            MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
 987            headers,
 988        )?))
 989    }
 990}
 991
 992impl EditPredictionUsage {
 993    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
 994        Ok(Self(RequestUsage::from_headers(
 995            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
 996            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
 997            headers,
 998        )?))
 999    }
1000}