user.rs

   1use super::{Client, Status, TypedEnvelope, proto};
   2use anyhow::{Context as _, Result};
   3use chrono::{DateTime, Utc};
   4use cloud_api_client::websocket_protocol::MessageToClient;
   5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
   6use cloud_llm_client::{
   7    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
   8    MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, Plan,
   9    UsageLimit,
  10};
  11use collections::{HashMap, HashSet, hash_map::Entry};
  12use derive_more::Deref;
  13use feature_flags::FeatureFlagAppExt;
  14use futures::{Future, StreamExt, channel::mpsc};
  15use gpui::{
  16    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  17};
  18use http_client::http::{HeaderMap, HeaderValue};
  19use postage::{sink::Sink, watch};
  20use rpc::proto::{RequestMessage, UsersResponse};
  21use std::{
  22    str::FromStr as _,
  23    sync::{Arc, Weak},
  24};
  25use text::ReplicaId;
  26use util::{ResultExt, TryFutureExt as _};
  27
  28pub type UserId = u64;
  29
  30#[derive(
  31    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  32)]
  33pub struct ChannelId(pub u64);
  34
  35impl std::fmt::Display for ChannelId {
  36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  37        self.0.fmt(f)
  38    }
  39}
  40
  41#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
  42pub struct ProjectId(pub u64);
  43
  44impl ProjectId {
  45    pub fn to_proto(self) -> u64 {
  46        self.0
  47    }
  48}
  49
  50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  51pub struct ParticipantIndex(pub u32);
  52
  53#[derive(Default, Debug)]
  54pub struct User {
  55    pub id: UserId,
  56    pub github_login: SharedString,
  57    pub avatar_uri: SharedUri,
  58    pub name: Option<String>,
  59}
  60
  61#[derive(Clone, Debug, PartialEq, Eq)]
  62pub struct Collaborator {
  63    pub peer_id: proto::PeerId,
  64    pub replica_id: ReplicaId,
  65    pub user_id: UserId,
  66    pub is_host: bool,
  67    pub committer_name: Option<String>,
  68    pub committer_email: Option<String>,
  69}
  70
  71impl PartialOrd for User {
  72    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
  73        Some(self.cmp(other))
  74    }
  75}
  76
  77impl Ord for User {
  78    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
  79        self.github_login.cmp(&other.github_login)
  80    }
  81}
  82
  83impl PartialEq for User {
  84    fn eq(&self, other: &Self) -> bool {
  85        self.id == other.id && self.github_login == other.github_login
  86    }
  87}
  88
  89impl Eq for User {}
  90
  91#[derive(Debug, PartialEq)]
  92pub struct Contact {
  93    pub user: Arc<User>,
  94    pub online: bool,
  95    pub busy: bool,
  96}
  97
  98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  99pub enum ContactRequestStatus {
 100    None,
 101    RequestSent,
 102    RequestReceived,
 103    RequestAccepted,
 104}
 105
 106pub struct UserStore {
 107    users: HashMap<u64, Arc<User>>,
 108    by_github_login: HashMap<SharedString, u64>,
 109    participant_indices: HashMap<u64, ParticipantIndex>,
 110    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 111    model_request_usage: Option<ModelRequestUsage>,
 112    edit_prediction_usage: Option<EditPredictionUsage>,
 113    plan_info: Option<PlanInfo>,
 114    current_user: watch::Receiver<Option<Arc<User>>>,
 115    contacts: Vec<Arc<Contact>>,
 116    incoming_contact_requests: Vec<Arc<User>>,
 117    outgoing_contact_requests: Vec<Arc<User>>,
 118    pending_contact_requests: HashMap<u64, usize>,
 119    invite_info: Option<InviteInfo>,
 120    client: Weak<Client>,
 121    _maintain_contacts: Task<()>,
 122    _maintain_current_user: Task<Result<()>>,
 123    weak_self: WeakEntity<Self>,
 124}
 125
 126#[derive(Clone)]
 127pub struct InviteInfo {
 128    pub count: u32,
 129    pub url: Arc<str>,
 130}
 131
 132pub enum Event {
 133    Contact {
 134        user: Arc<User>,
 135        kind: ContactEventKind,
 136    },
 137    ShowContacts,
 138    ParticipantIndicesChanged,
 139    PrivateUserInfoUpdated,
 140    PlanUpdated,
 141}
 142
 143#[derive(Clone, Copy)]
 144pub enum ContactEventKind {
 145    Requested,
 146    Accepted,
 147    Cancelled,
 148}
 149
 150impl EventEmitter<Event> for UserStore {}
 151
 152enum UpdateContacts {
 153    Update(proto::UpdateContacts),
 154    Wait(postage::barrier::Sender),
 155    Clear(postage::barrier::Sender),
 156}
 157
 158#[derive(Debug, Clone, Copy, Deref)]
 159pub struct ModelRequestUsage(pub RequestUsage);
 160
 161#[derive(Debug, Clone, Copy, Deref)]
 162pub struct EditPredictionUsage(pub RequestUsage);
 163
 164#[derive(Debug, Clone, Copy)]
 165pub struct RequestUsage {
 166    pub limit: UsageLimit,
 167    pub amount: i32,
 168}
 169
 170impl UserStore {
 171    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
 172        let (mut current_user_tx, current_user_rx) = watch::channel();
 173        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 174        let rpc_subscriptions = vec![
 175            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
 176            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
 177            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
 178        ];
 179
 180        client.add_message_to_client_handler({
 181            let this = cx.weak_entity();
 182            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
 183        });
 184
 185        Self {
 186            users: Default::default(),
 187            by_github_login: Default::default(),
 188            current_user: current_user_rx,
 189            plan_info: None,
 190            model_request_usage: None,
 191            edit_prediction_usage: None,
 192            contacts: Default::default(),
 193            incoming_contact_requests: Default::default(),
 194            participant_indices: Default::default(),
 195            outgoing_contact_requests: Default::default(),
 196            invite_info: None,
 197            client: Arc::downgrade(&client),
 198            update_contacts_tx,
 199            _maintain_contacts: cx.spawn(async move |this, cx| {
 200                let _subscriptions = rpc_subscriptions;
 201                while let Some(message) = update_contacts_rx.next().await {
 202                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
 203                    {
 204                        task.log_err().await;
 205                    } else {
 206                        break;
 207                    }
 208                }
 209            }),
 210            _maintain_current_user: cx.spawn(async move |this, cx| {
 211                let mut status = client.status();
 212                let weak = Arc::downgrade(&client);
 213                drop(client);
 214                while let Some(status) = status.next().await {
 215                    // if the client is dropped, the app is shutting down.
 216                    let Some(client) = weak.upgrade() else {
 217                        return Ok(());
 218                    };
 219                    match status {
 220                        Status::Authenticated
 221                        | Status::Reauthenticated
 222                        | Status::Connected { .. } => {
 223                            if let Some(user_id) = client.user_id() {
 224                                let response = client
 225                                    .cloud_client()
 226                                    .get_authenticated_user()
 227                                    .await
 228                                    .log_err();
 229
 230                                let current_user_and_response = if let Some(response) = response {
 231                                    let user = Arc::new(User {
 232                                        id: user_id,
 233                                        github_login: response.user.github_login.clone().into(),
 234                                        avatar_uri: response.user.avatar_url.clone().into(),
 235                                        name: response.user.name.clone(),
 236                                    });
 237
 238                                    Some((user, response))
 239                                } else {
 240                                    None
 241                                };
 242                                current_user_tx
 243                                    .send(
 244                                        current_user_and_response
 245                                            .as_ref()
 246                                            .map(|(user, _)| user.clone()),
 247                                    )
 248                                    .await
 249                                    .ok();
 250
 251                                cx.update(|cx| {
 252                                    if let Some((user, response)) = current_user_and_response {
 253                                        this.update(cx, |this, cx| {
 254                                            this.by_github_login
 255                                                .insert(user.github_login.clone(), user_id);
 256                                            this.users.insert(user_id, user);
 257                                            this.update_authenticated_user(response, cx)
 258                                        })
 259                                    } else {
 260                                        anyhow::Ok(())
 261                                    }
 262                                })??;
 263
 264                                this.update(cx, |_, cx| cx.notify())?;
 265                            }
 266                        }
 267                        Status::SignedOut => {
 268                            current_user_tx.send(None).await.ok();
 269                            this.update(cx, |this, cx| {
 270                                this.clear_plan_and_usage();
 271                                cx.emit(Event::PrivateUserInfoUpdated);
 272                                cx.notify();
 273                                this.clear_contacts()
 274                            })?
 275                            .await;
 276                        }
 277                        Status::ConnectionLost => {
 278                            this.update(cx, |this, cx| {
 279                                cx.notify();
 280                                this.clear_contacts()
 281                            })?
 282                            .await;
 283                        }
 284                        _ => {}
 285                    }
 286                }
 287                Ok(())
 288            }),
 289            pending_contact_requests: Default::default(),
 290            weak_self: cx.weak_entity(),
 291        }
 292    }
 293
 294    #[cfg(feature = "test-support")]
 295    pub fn clear_cache(&mut self) {
 296        self.users.clear();
 297        self.by_github_login.clear();
 298    }
 299
 300    async fn handle_update_invite_info(
 301        this: Entity<Self>,
 302        message: TypedEnvelope<proto::UpdateInviteInfo>,
 303        mut cx: AsyncApp,
 304    ) -> Result<()> {
 305        this.update(&mut cx, |this, cx| {
 306            this.invite_info = Some(InviteInfo {
 307                url: Arc::from(message.payload.url),
 308                count: message.payload.count,
 309            });
 310            cx.notify();
 311        })?;
 312        Ok(())
 313    }
 314
 315    async fn handle_show_contacts(
 316        this: Entity<Self>,
 317        _: TypedEnvelope<proto::ShowContacts>,
 318        mut cx: AsyncApp,
 319    ) -> Result<()> {
 320        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
 321        Ok(())
 322    }
 323
 324    pub fn invite_info(&self) -> Option<&InviteInfo> {
 325        self.invite_info.as_ref()
 326    }
 327
 328    async fn handle_update_contacts(
 329        this: Entity<Self>,
 330        message: TypedEnvelope<proto::UpdateContacts>,
 331        cx: AsyncApp,
 332    ) -> Result<()> {
 333        this.read_with(&cx, |this, _| {
 334            this.update_contacts_tx
 335                .unbounded_send(UpdateContacts::Update(message.payload))
 336                .unwrap();
 337        })?;
 338        Ok(())
 339    }
 340
 341    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
 342        match message {
 343            UpdateContacts::Wait(barrier) => {
 344                drop(barrier);
 345                Task::ready(Ok(()))
 346            }
 347            UpdateContacts::Clear(barrier) => {
 348                self.contacts.clear();
 349                self.incoming_contact_requests.clear();
 350                self.outgoing_contact_requests.clear();
 351                drop(barrier);
 352                Task::ready(Ok(()))
 353            }
 354            UpdateContacts::Update(message) => {
 355                let mut user_ids = HashSet::default();
 356                for contact in &message.contacts {
 357                    user_ids.insert(contact.user_id);
 358                }
 359                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
 360                user_ids.extend(message.outgoing_requests.iter());
 361
 362                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
 363                cx.spawn(async move |this, cx| {
 364                    load_users.await?;
 365
 366                    // Users are fetched in parallel above and cached in call to get_users
 367                    // No need to parallelize here
 368                    let mut updated_contacts = Vec::new();
 369                    let this = this.upgrade().context("can't upgrade user store handle")?;
 370                    for contact in message.contacts {
 371                        updated_contacts
 372                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
 373                    }
 374
 375                    let mut incoming_requests = Vec::new();
 376                    for request in message.incoming_requests {
 377                        incoming_requests.push({
 378                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
 379                                .await?
 380                        });
 381                    }
 382
 383                    let mut outgoing_requests = Vec::new();
 384                    for requested_user_id in message.outgoing_requests {
 385                        outgoing_requests.push(
 386                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
 387                                .await?,
 388                        );
 389                    }
 390
 391                    let removed_contacts =
 392                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
 393                    let removed_incoming_requests =
 394                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
 395                    let removed_outgoing_requests =
 396                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
 397
 398                    this.update(cx, |this, cx| {
 399                        // Remove contacts
 400                        this.contacts
 401                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
 402                        // Update existing contacts and insert new ones
 403                        for updated_contact in updated_contacts {
 404                            match this.contacts.binary_search_by_key(
 405                                &&updated_contact.user.github_login,
 406                                |contact| &contact.user.github_login,
 407                            ) {
 408                                Ok(ix) => this.contacts[ix] = updated_contact,
 409                                Err(ix) => this.contacts.insert(ix, updated_contact),
 410                            }
 411                        }
 412
 413                        // Remove incoming contact requests
 414                        this.incoming_contact_requests.retain(|user| {
 415                            if removed_incoming_requests.contains(&user.id) {
 416                                cx.emit(Event::Contact {
 417                                    user: user.clone(),
 418                                    kind: ContactEventKind::Cancelled,
 419                                });
 420                                false
 421                            } else {
 422                                true
 423                            }
 424                        });
 425                        // Update existing incoming requests and insert new ones
 426                        for user in incoming_requests {
 427                            match this
 428                                .incoming_contact_requests
 429                                .binary_search_by_key(&&user.github_login, |contact| {
 430                                    &contact.github_login
 431                                }) {
 432                                Ok(ix) => this.incoming_contact_requests[ix] = user,
 433                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
 434                            }
 435                        }
 436
 437                        // Remove outgoing contact requests
 438                        this.outgoing_contact_requests
 439                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
 440                        // Update existing incoming requests and insert new ones
 441                        for request in outgoing_requests {
 442                            match this
 443                                .outgoing_contact_requests
 444                                .binary_search_by_key(&&request.github_login, |contact| {
 445                                    &contact.github_login
 446                                }) {
 447                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
 448                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
 449                            }
 450                        }
 451
 452                        cx.notify();
 453                    })?;
 454
 455                    Ok(())
 456                })
 457            }
 458        }
 459    }
 460
 461    pub fn contacts(&self) -> &[Arc<Contact>] {
 462        &self.contacts
 463    }
 464
 465    pub fn has_contact(&self, user: &Arc<User>) -> bool {
 466        self.contacts
 467            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 468            .is_ok()
 469    }
 470
 471    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
 472        &self.incoming_contact_requests
 473    }
 474
 475    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
 476        &self.outgoing_contact_requests
 477    }
 478
 479    pub fn is_contact_request_pending(&self, user: &User) -> bool {
 480        self.pending_contact_requests.contains_key(&user.id)
 481    }
 482
 483    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
 484        if self
 485            .contacts
 486            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 487            .is_ok()
 488        {
 489            ContactRequestStatus::RequestAccepted
 490        } else if self
 491            .outgoing_contact_requests
 492            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 493            .is_ok()
 494        {
 495            ContactRequestStatus::RequestSent
 496        } else if self
 497            .incoming_contact_requests
 498            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 499            .is_ok()
 500        {
 501            ContactRequestStatus::RequestReceived
 502        } else {
 503            ContactRequestStatus::None
 504        }
 505    }
 506
 507    pub fn request_contact(
 508        &mut self,
 509        responder_id: u64,
 510        cx: &mut Context<Self>,
 511    ) -> Task<Result<()>> {
 512        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
 513    }
 514
 515    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
 516        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
 517    }
 518
 519    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
 520        self.incoming_contact_requests
 521            .iter()
 522            .any(|user| user.id == user_id)
 523    }
 524
 525    pub fn respond_to_contact_request(
 526        &mut self,
 527        requester_id: u64,
 528        accept: bool,
 529        cx: &mut Context<Self>,
 530    ) -> Task<Result<()>> {
 531        self.perform_contact_request(
 532            requester_id,
 533            proto::RespondToContactRequest {
 534                requester_id,
 535                response: if accept {
 536                    proto::ContactRequestResponse::Accept
 537                } else {
 538                    proto::ContactRequestResponse::Decline
 539                } as i32,
 540            },
 541            cx,
 542        )
 543    }
 544
 545    pub fn dismiss_contact_request(
 546        &self,
 547        requester_id: u64,
 548        cx: &Context<Self>,
 549    ) -> Task<Result<()>> {
 550        let client = self.client.upgrade();
 551        cx.spawn(async move |_, _| {
 552            client
 553                .context("can't upgrade client reference")?
 554                .request(proto::RespondToContactRequest {
 555                    requester_id,
 556                    response: proto::ContactRequestResponse::Dismiss as i32,
 557                })
 558                .await?;
 559            Ok(())
 560        })
 561    }
 562
 563    fn perform_contact_request<T: RequestMessage>(
 564        &mut self,
 565        user_id: u64,
 566        request: T,
 567        cx: &mut Context<Self>,
 568    ) -> Task<Result<()>> {
 569        let client = self.client.upgrade();
 570        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
 571        cx.notify();
 572
 573        cx.spawn(async move |this, cx| {
 574            let response = client
 575                .context("can't upgrade client reference")?
 576                .request(request)
 577                .await;
 578            this.update(cx, |this, cx| {
 579                if let Entry::Occupied(mut request_count) =
 580                    this.pending_contact_requests.entry(user_id)
 581                {
 582                    *request_count.get_mut() -= 1;
 583                    if *request_count.get() == 0 {
 584                        request_count.remove();
 585                    }
 586                }
 587                cx.notify();
 588            })?;
 589            response?;
 590            Ok(())
 591        })
 592    }
 593
 594    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
 595        let (tx, mut rx) = postage::barrier::channel();
 596        self.update_contacts_tx
 597            .unbounded_send(UpdateContacts::Clear(tx))
 598            .unwrap();
 599        async move {
 600            rx.next().await;
 601        }
 602    }
 603
 604    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
 605        let (tx, mut rx) = postage::barrier::channel();
 606        self.update_contacts_tx
 607            .unbounded_send(UpdateContacts::Wait(tx))
 608            .unwrap();
 609        async move {
 610            rx.next().await;
 611        }
 612    }
 613
 614    pub fn get_users(
 615        &self,
 616        user_ids: Vec<u64>,
 617        cx: &Context<Self>,
 618    ) -> Task<Result<Vec<Arc<User>>>> {
 619        let mut user_ids_to_fetch = user_ids.clone();
 620        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
 621
 622        cx.spawn(async move |this, cx| {
 623            if !user_ids_to_fetch.is_empty() {
 624                this.update(cx, |this, cx| {
 625                    this.load_users(
 626                        proto::GetUsers {
 627                            user_ids: user_ids_to_fetch,
 628                        },
 629                        cx,
 630                    )
 631                })?
 632                .await?;
 633            }
 634
 635            this.read_with(cx, |this, _| {
 636                user_ids
 637                    .iter()
 638                    .map(|user_id| {
 639                        this.users
 640                            .get(user_id)
 641                            .cloned()
 642                            .with_context(|| format!("user {user_id} not found"))
 643                    })
 644                    .collect()
 645            })?
 646        })
 647    }
 648
 649    pub fn fuzzy_search_users(
 650        &self,
 651        query: String,
 652        cx: &Context<Self>,
 653    ) -> Task<Result<Vec<Arc<User>>>> {
 654        self.load_users(proto::FuzzySearchUsers { query }, cx)
 655    }
 656
 657    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
 658        self.users.get(&user_id).cloned()
 659    }
 660
 661    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
 662        if let Some(user) = self.users.get(&user_id).cloned() {
 663            return Some(user);
 664        }
 665
 666        self.get_user(user_id, cx).detach_and_log_err(cx);
 667        None
 668    }
 669
 670    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
 671        if let Some(user) = self.users.get(&user_id).cloned() {
 672            return Task::ready(Ok(user));
 673        }
 674
 675        let load_users = self.get_users(vec![user_id], cx);
 676        cx.spawn(async move |this, cx| {
 677            load_users.await?;
 678            this.read_with(cx, |this, _| {
 679                this.users
 680                    .get(&user_id)
 681                    .cloned()
 682                    .context("server responded with no users")
 683            })?
 684        })
 685    }
 686
 687    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
 688        self.by_github_login
 689            .get(github_login)
 690            .and_then(|id| self.users.get(id).cloned())
 691    }
 692
 693    pub fn current_user(&self) -> Option<Arc<User>> {
 694        self.current_user.borrow().clone()
 695    }
 696
 697    pub fn plan(&self) -> Option<Plan> {
 698        #[cfg(debug_assertions)]
 699        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
 700            use cloud_llm_client::PlanV1;
 701
 702            return match plan.as_str() {
 703                "free" => Some(Plan::V1(PlanV1::ZedFree)),
 704                "trial" => Some(Plan::V1(PlanV1::ZedProTrial)),
 705                "pro" => Some(Plan::V1(PlanV1::ZedPro)),
 706                _ => {
 707                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
 708                }
 709            };
 710        }
 711
 712        self.plan_info.as_ref().map(|info| info.plan())
 713    }
 714
 715    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 716        self.plan_info
 717            .as_ref()
 718            .and_then(|plan| plan.subscription_period)
 719            .map(|subscription_period| {
 720                (
 721                    subscription_period.started_at.0,
 722                    subscription_period.ended_at.0,
 723                )
 724            })
 725    }
 726
 727    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
 728        self.plan_info
 729            .as_ref()
 730            .and_then(|plan| plan.trial_started_at)
 731            .map(|trial_started_at| trial_started_at.0)
 732    }
 733
 734    /// Returns whether the user's account is too new to use the service.
 735    pub fn account_too_young(&self) -> bool {
 736        self.plan_info
 737            .as_ref()
 738            .map(|plan| plan.is_account_too_young)
 739            .unwrap_or_default()
 740    }
 741
 742    /// Returns whether the current user has overdue invoices and usage should be blocked.
 743    pub fn has_overdue_invoices(&self) -> bool {
 744        self.plan_info
 745            .as_ref()
 746            .map(|plan| plan.has_overdue_invoices)
 747            .unwrap_or_default()
 748    }
 749
 750    pub fn is_usage_based_billing_enabled(&self) -> bool {
 751        self.plan_info
 752            .as_ref()
 753            .map(|plan| plan.is_usage_based_billing_enabled)
 754            .unwrap_or_default()
 755    }
 756
 757    pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
 758        if self.plan().is_some_and(|plan| plan.is_v2()) {
 759            return None;
 760        }
 761
 762        self.model_request_usage
 763    }
 764
 765    pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
 766        self.model_request_usage = Some(usage);
 767        cx.notify();
 768    }
 769
 770    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
 771        self.edit_prediction_usage
 772    }
 773
 774    pub fn update_edit_prediction_usage(
 775        &mut self,
 776        usage: EditPredictionUsage,
 777        cx: &mut Context<Self>,
 778    ) {
 779        self.edit_prediction_usage = Some(usage);
 780        cx.notify();
 781    }
 782
 783    pub fn clear_plan_and_usage(&mut self) {
 784        self.plan_info = None;
 785        self.model_request_usage = None;
 786        self.edit_prediction_usage = None;
 787    }
 788
 789    fn update_authenticated_user(
 790        &mut self,
 791        response: GetAuthenticatedUserResponse,
 792        cx: &mut Context<Self>,
 793    ) {
 794        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
 795        cx.update_flags(staff, response.feature_flags);
 796        if let Some(client) = self.client.upgrade() {
 797            client
 798                .telemetry
 799                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
 800        }
 801
 802        self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
 803            limit: response.plan.usage.model_requests.limit,
 804            amount: response.plan.usage.model_requests.used as i32,
 805        }));
 806        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
 807            limit: response.plan.usage.edit_predictions.limit,
 808            amount: response.plan.usage.edit_predictions.used as i32,
 809        }));
 810        self.plan_info = Some(response.plan);
 811        cx.emit(Event::PrivateUserInfoUpdated);
 812    }
 813
 814    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
 815        cx.spawn(async move |cx| {
 816            match message {
 817                MessageToClient::UserUpdated => {
 818                    let cloud_client = cx
 819                        .update(|cx| {
 820                            this.read_with(cx, |this, _cx| {
 821                                this.client.upgrade().map(|client| client.cloud_client())
 822                            })
 823                        })??
 824                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
 825
 826                    let response = cloud_client.get_authenticated_user().await?;
 827                    cx.update(|cx| {
 828                        this.update(cx, |this, cx| {
 829                            this.update_authenticated_user(response, cx);
 830                        })
 831                    })??;
 832                }
 833            }
 834
 835            anyhow::Ok(())
 836        })
 837        .detach_and_log_err(cx);
 838    }
 839
 840    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
 841        self.current_user.clone()
 842    }
 843
 844    fn load_users(
 845        &self,
 846        request: impl RequestMessage<Response = UsersResponse>,
 847        cx: &Context<Self>,
 848    ) -> Task<Result<Vec<Arc<User>>>> {
 849        let client = self.client.clone();
 850        cx.spawn(async move |this, cx| {
 851            if let Some(rpc) = client.upgrade() {
 852                let response = rpc.request(request).await.context("error loading users")?;
 853                let users = response.users;
 854
 855                this.update(cx, |this, _| this.insert(users))
 856            } else {
 857                Ok(Vec::new())
 858            }
 859        })
 860    }
 861
 862    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
 863        let mut ret = Vec::with_capacity(users.len());
 864        for user in users {
 865            let user = User::new(user);
 866            if let Some(old) = self.users.insert(user.id, user.clone())
 867                && old.github_login != user.github_login
 868            {
 869                self.by_github_login.remove(&old.github_login);
 870            }
 871            self.by_github_login
 872                .insert(user.github_login.clone(), user.id);
 873            ret.push(user)
 874        }
 875        ret
 876    }
 877
 878    pub fn set_participant_indices(
 879        &mut self,
 880        participant_indices: HashMap<u64, ParticipantIndex>,
 881        cx: &mut Context<Self>,
 882    ) {
 883        if participant_indices != self.participant_indices {
 884            self.participant_indices = participant_indices;
 885            cx.emit(Event::ParticipantIndicesChanged);
 886        }
 887    }
 888
 889    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
 890        &self.participant_indices
 891    }
 892
 893    pub fn participant_names(
 894        &self,
 895        user_ids: impl Iterator<Item = u64>,
 896        cx: &App,
 897    ) -> HashMap<u64, SharedString> {
 898        let mut ret = HashMap::default();
 899        let mut missing_user_ids = Vec::new();
 900        for id in user_ids {
 901            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
 902                ret.insert(id, github_login);
 903            } else {
 904                missing_user_ids.push(id)
 905            }
 906        }
 907        if !missing_user_ids.is_empty() {
 908            let this = self.weak_self.clone();
 909            cx.spawn(async move |cx| {
 910                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
 911                    .await
 912            })
 913            .detach_and_log_err(cx);
 914        }
 915        ret
 916    }
 917}
 918
 919impl User {
 920    fn new(message: proto::User) -> Arc<Self> {
 921        Arc::new(User {
 922            id: message.id,
 923            github_login: message.github_login.into(),
 924            avatar_uri: message.avatar_url.into(),
 925            name: message.name,
 926        })
 927    }
 928}
 929
 930impl Contact {
 931    async fn from_proto(
 932        contact: proto::Contact,
 933        user_store: &Entity<UserStore>,
 934        cx: &mut AsyncApp,
 935    ) -> Result<Self> {
 936        let user = user_store
 937            .update(cx, |user_store, cx| {
 938                user_store.get_user(contact.user_id, cx)
 939            })?
 940            .await?;
 941        Ok(Self {
 942            user,
 943            online: contact.online,
 944            busy: contact.busy,
 945        })
 946    }
 947}
 948
 949impl Collaborator {
 950    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
 951        Ok(Self {
 952            peer_id: message.peer_id.context("invalid peer id")?,
 953            replica_id: ReplicaId::new(message.replica_id as u16),
 954            user_id: message.user_id as UserId,
 955            is_host: message.is_host,
 956            committer_name: message.committer_name,
 957            committer_email: message.committer_email,
 958        })
 959    }
 960}
 961
 962impl RequestUsage {
 963    pub fn over_limit(&self) -> bool {
 964        match self.limit {
 965            UsageLimit::Limited(limit) => self.amount >= limit,
 966            UsageLimit::Unlimited => false,
 967        }
 968    }
 969
 970    fn from_headers(
 971        limit_name: &str,
 972        amount_name: &str,
 973        headers: &HeaderMap<HeaderValue>,
 974    ) -> Result<Self> {
 975        let limit = headers
 976            .get(limit_name)
 977            .with_context(|| format!("missing {limit_name:?} header"))?;
 978        let limit = UsageLimit::from_str(limit.to_str()?)?;
 979
 980        let amount = headers
 981            .get(amount_name)
 982            .with_context(|| format!("missing {amount_name:?} header"))?;
 983        let amount = amount.to_str()?.parse::<i32>()?;
 984
 985        Ok(Self { limit, amount })
 986    }
 987}
 988
 989impl ModelRequestUsage {
 990    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
 991        Ok(Self(RequestUsage::from_headers(
 992            MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
 993            MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
 994            headers,
 995        )?))
 996    }
 997}
 998
 999impl EditPredictionUsage {
1000    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1001        Ok(Self(RequestUsage::from_headers(
1002            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1003            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1004            headers,
1005        )?))
1006    }
1007}