user.rs

   1use super::{Client, Status, TypedEnvelope, proto};
   2use anyhow::{Context as _, Result, anyhow};
   3use chrono::{DateTime, Utc};
   4use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
   5use cloud_llm_client::{
   6    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
   7    MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
   8};
   9use collections::{HashMap, HashSet, hash_map::Entry};
  10use derive_more::Deref;
  11use feature_flags::FeatureFlagAppExt;
  12use futures::{Future, StreamExt, channel::mpsc};
  13use gpui::{
  14    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  15};
  16use http_client::http::{HeaderMap, HeaderValue};
  17use postage::{sink::Sink, watch};
  18use rpc::proto::{RequestMessage, UsersResponse};
  19use std::{
  20    str::FromStr as _,
  21    sync::{Arc, Weak},
  22};
  23use text::ReplicaId;
  24use util::{ResultExt, TryFutureExt as _};
  25
  26pub type UserId = u64;
  27
  28#[derive(
  29    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  30)]
  31pub struct ChannelId(pub u64);
  32
  33impl std::fmt::Display for ChannelId {
  34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  35        self.0.fmt(f)
  36    }
  37}
  38
  39#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
  40pub struct ProjectId(pub u64);
  41
  42impl ProjectId {
  43    pub fn to_proto(&self) -> u64 {
  44        self.0
  45    }
  46}
  47
  48#[derive(
  49    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  50)]
  51pub struct DevServerProjectId(pub u64);
  52
  53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  54pub struct ParticipantIndex(pub u32);
  55
  56#[derive(Default, Debug)]
  57pub struct User {
  58    pub id: UserId,
  59    pub github_login: SharedString,
  60    pub avatar_uri: SharedUri,
  61    pub name: Option<String>,
  62}
  63
  64#[derive(Clone, Debug, PartialEq, Eq)]
  65pub struct Collaborator {
  66    pub peer_id: proto::PeerId,
  67    pub replica_id: ReplicaId,
  68    pub user_id: UserId,
  69    pub is_host: bool,
  70    pub committer_name: Option<String>,
  71    pub committer_email: Option<String>,
  72}
  73
  74impl PartialOrd for User {
  75    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
  76        Some(self.cmp(other))
  77    }
  78}
  79
  80impl Ord for User {
  81    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
  82        self.github_login.cmp(&other.github_login)
  83    }
  84}
  85
  86impl PartialEq for User {
  87    fn eq(&self, other: &Self) -> bool {
  88        self.id == other.id && self.github_login == other.github_login
  89    }
  90}
  91
  92impl Eq for User {}
  93
  94#[derive(Debug, PartialEq)]
  95pub struct Contact {
  96    pub user: Arc<User>,
  97    pub online: bool,
  98    pub busy: bool,
  99}
 100
 101#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 102pub enum ContactRequestStatus {
 103    None,
 104    RequestSent,
 105    RequestReceived,
 106    RequestAccepted,
 107}
 108
 109pub struct UserStore {
 110    users: HashMap<u64, Arc<User>>,
 111    by_github_login: HashMap<SharedString, u64>,
 112    participant_indices: HashMap<u64, ParticipantIndex>,
 113    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 114    model_request_usage: Option<ModelRequestUsage>,
 115    edit_prediction_usage: Option<EditPredictionUsage>,
 116    plan_info: Option<PlanInfo>,
 117    current_user: watch::Receiver<Option<Arc<User>>>,
 118    accepted_tos_at: Option<Option<cloud_api_client::Timestamp>>,
 119    contacts: Vec<Arc<Contact>>,
 120    incoming_contact_requests: Vec<Arc<User>>,
 121    outgoing_contact_requests: Vec<Arc<User>>,
 122    pending_contact_requests: HashMap<u64, usize>,
 123    invite_info: Option<InviteInfo>,
 124    client: Weak<Client>,
 125    _maintain_contacts: Task<()>,
 126    _maintain_current_user: Task<Result<()>>,
 127    weak_self: WeakEntity<Self>,
 128}
 129
 130#[derive(Clone)]
 131pub struct InviteInfo {
 132    pub count: u32,
 133    pub url: Arc<str>,
 134}
 135
 136pub enum Event {
 137    Contact {
 138        user: Arc<User>,
 139        kind: ContactEventKind,
 140    },
 141    ShowContacts,
 142    ParticipantIndicesChanged,
 143    PrivateUserInfoUpdated,
 144    PlanUpdated,
 145}
 146
 147#[derive(Clone, Copy)]
 148pub enum ContactEventKind {
 149    Requested,
 150    Accepted,
 151    Cancelled,
 152}
 153
 154impl EventEmitter<Event> for UserStore {}
 155
 156enum UpdateContacts {
 157    Update(proto::UpdateContacts),
 158    Wait(postage::barrier::Sender),
 159    Clear(postage::barrier::Sender),
 160}
 161
 162#[derive(Debug, Clone, Copy, Deref)]
 163pub struct ModelRequestUsage(pub RequestUsage);
 164
 165#[derive(Debug, Clone, Copy, Deref)]
 166pub struct EditPredictionUsage(pub RequestUsage);
 167
 168#[derive(Debug, Clone, Copy)]
 169pub struct RequestUsage {
 170    pub limit: UsageLimit,
 171    pub amount: i32,
 172}
 173
 174impl UserStore {
 175    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
 176        let (mut current_user_tx, current_user_rx) = watch::channel();
 177        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 178        let rpc_subscriptions = vec![
 179            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
 180            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
 181            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
 182            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
 183        ];
 184        Self {
 185            users: Default::default(),
 186            by_github_login: Default::default(),
 187            current_user: current_user_rx,
 188            plan_info: None,
 189            model_request_usage: None,
 190            edit_prediction_usage: None,
 191            accepted_tos_at: None,
 192            contacts: Default::default(),
 193            incoming_contact_requests: Default::default(),
 194            participant_indices: Default::default(),
 195            outgoing_contact_requests: Default::default(),
 196            invite_info: None,
 197            client: Arc::downgrade(&client),
 198            update_contacts_tx,
 199            _maintain_contacts: cx.spawn(async move |this, cx| {
 200                let _subscriptions = rpc_subscriptions;
 201                while let Some(message) = update_contacts_rx.next().await {
 202                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
 203                    {
 204                        task.log_err().await;
 205                    } else {
 206                        break;
 207                    }
 208                }
 209            }),
 210            _maintain_current_user: cx.spawn(async move |this, cx| {
 211                let mut status = client.status();
 212                let weak = Arc::downgrade(&client);
 213                drop(client);
 214                while let Some(status) = status.next().await {
 215                    // if the client is dropped, the app is shutting down.
 216                    let Some(client) = weak.upgrade() else {
 217                        return Ok(());
 218                    };
 219                    match status {
 220                        Status::Authenticated | Status::Connected { .. } => {
 221                            if let Some(user_id) = client.user_id() {
 222                                let response = client.cloud_client().get_authenticated_user().await;
 223                                let mut current_user = None;
 224                                cx.update(|cx| {
 225                                    if let Some(response) = response.log_err() {
 226                                        let user = Arc::new(User {
 227                                            id: user_id,
 228                                            github_login: response.user.github_login.clone().into(),
 229                                            avatar_uri: response.user.avatar_url.clone().into(),
 230                                            name: response.user.name.clone(),
 231                                        });
 232                                        current_user = Some(user.clone());
 233                                        this.update(cx, |this, cx| {
 234                                            this.by_github_login
 235                                                .insert(user.github_login.clone(), user_id);
 236                                            this.users.insert(user_id, user);
 237                                            this.update_authenticated_user(response, cx)
 238                                        })
 239                                    } else {
 240                                        anyhow::Ok(())
 241                                    }
 242                                })??;
 243                                current_user_tx.send(current_user).await.ok();
 244
 245                                this.update(cx, |_, cx| cx.notify())?;
 246                            }
 247                        }
 248                        Status::SignedOut => {
 249                            current_user_tx.send(None).await.ok();
 250                            this.update(cx, |this, cx| {
 251                                this.accepted_tos_at = None;
 252                                cx.emit(Event::PrivateUserInfoUpdated);
 253                                cx.notify();
 254                                this.clear_contacts()
 255                            })?
 256                            .await;
 257                        }
 258                        Status::ConnectionLost => {
 259                            this.update(cx, |this, cx| {
 260                                cx.notify();
 261                                this.clear_contacts()
 262                            })?
 263                            .await;
 264                        }
 265                        _ => {}
 266                    }
 267                }
 268                Ok(())
 269            }),
 270            pending_contact_requests: Default::default(),
 271            weak_self: cx.weak_entity(),
 272        }
 273    }
 274
 275    #[cfg(feature = "test-support")]
 276    pub fn clear_cache(&mut self) {
 277        self.users.clear();
 278        self.by_github_login.clear();
 279    }
 280
 281    async fn handle_update_invite_info(
 282        this: Entity<Self>,
 283        message: TypedEnvelope<proto::UpdateInviteInfo>,
 284        mut cx: AsyncApp,
 285    ) -> Result<()> {
 286        this.update(&mut cx, |this, cx| {
 287            this.invite_info = Some(InviteInfo {
 288                url: Arc::from(message.payload.url),
 289                count: message.payload.count,
 290            });
 291            cx.notify();
 292        })?;
 293        Ok(())
 294    }
 295
 296    async fn handle_show_contacts(
 297        this: Entity<Self>,
 298        _: TypedEnvelope<proto::ShowContacts>,
 299        mut cx: AsyncApp,
 300    ) -> Result<()> {
 301        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
 302        Ok(())
 303    }
 304
 305    pub fn invite_info(&self) -> Option<&InviteInfo> {
 306        self.invite_info.as_ref()
 307    }
 308
 309    async fn handle_update_contacts(
 310        this: Entity<Self>,
 311        message: TypedEnvelope<proto::UpdateContacts>,
 312        mut cx: AsyncApp,
 313    ) -> Result<()> {
 314        this.read_with(&mut cx, |this, _| {
 315            this.update_contacts_tx
 316                .unbounded_send(UpdateContacts::Update(message.payload))
 317                .unwrap();
 318        })?;
 319        Ok(())
 320    }
 321
 322    async fn handle_update_plan(
 323        this: Entity<Self>,
 324        _message: TypedEnvelope<proto::UpdateUserPlan>,
 325        mut cx: AsyncApp,
 326    ) -> Result<()> {
 327        let client = this
 328            .read_with(&cx, |this, _| this.client.upgrade())?
 329            .context("client was dropped")?;
 330
 331        let response = client
 332            .cloud_client()
 333            .get_authenticated_user()
 334            .await
 335            .context("failed to fetch authenticated user")?;
 336
 337        this.update(&mut cx, |this, cx| {
 338            this.update_authenticated_user(response, cx);
 339        })
 340    }
 341
 342    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
 343        match message {
 344            UpdateContacts::Wait(barrier) => {
 345                drop(barrier);
 346                Task::ready(Ok(()))
 347            }
 348            UpdateContacts::Clear(barrier) => {
 349                self.contacts.clear();
 350                self.incoming_contact_requests.clear();
 351                self.outgoing_contact_requests.clear();
 352                drop(barrier);
 353                Task::ready(Ok(()))
 354            }
 355            UpdateContacts::Update(message) => {
 356                let mut user_ids = HashSet::default();
 357                for contact in &message.contacts {
 358                    user_ids.insert(contact.user_id);
 359                }
 360                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
 361                user_ids.extend(message.outgoing_requests.iter());
 362
 363                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
 364                cx.spawn(async move |this, cx| {
 365                    load_users.await?;
 366
 367                    // Users are fetched in parallel above and cached in call to get_users
 368                    // No need to parallelize here
 369                    let mut updated_contacts = Vec::new();
 370                    let this = this.upgrade().context("can't upgrade user store handle")?;
 371                    for contact in message.contacts {
 372                        updated_contacts
 373                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
 374                    }
 375
 376                    let mut incoming_requests = Vec::new();
 377                    for request in message.incoming_requests {
 378                        incoming_requests.push({
 379                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
 380                                .await?
 381                        });
 382                    }
 383
 384                    let mut outgoing_requests = Vec::new();
 385                    for requested_user_id in message.outgoing_requests {
 386                        outgoing_requests.push(
 387                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
 388                                .await?,
 389                        );
 390                    }
 391
 392                    let removed_contacts =
 393                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
 394                    let removed_incoming_requests =
 395                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
 396                    let removed_outgoing_requests =
 397                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
 398
 399                    this.update(cx, |this, cx| {
 400                        // Remove contacts
 401                        this.contacts
 402                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
 403                        // Update existing contacts and insert new ones
 404                        for updated_contact in updated_contacts {
 405                            match this.contacts.binary_search_by_key(
 406                                &&updated_contact.user.github_login,
 407                                |contact| &contact.user.github_login,
 408                            ) {
 409                                Ok(ix) => this.contacts[ix] = updated_contact,
 410                                Err(ix) => this.contacts.insert(ix, updated_contact),
 411                            }
 412                        }
 413
 414                        // Remove incoming contact requests
 415                        this.incoming_contact_requests.retain(|user| {
 416                            if removed_incoming_requests.contains(&user.id) {
 417                                cx.emit(Event::Contact {
 418                                    user: user.clone(),
 419                                    kind: ContactEventKind::Cancelled,
 420                                });
 421                                false
 422                            } else {
 423                                true
 424                            }
 425                        });
 426                        // Update existing incoming requests and insert new ones
 427                        for user in incoming_requests {
 428                            match this
 429                                .incoming_contact_requests
 430                                .binary_search_by_key(&&user.github_login, |contact| {
 431                                    &contact.github_login
 432                                }) {
 433                                Ok(ix) => this.incoming_contact_requests[ix] = user,
 434                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
 435                            }
 436                        }
 437
 438                        // Remove outgoing contact requests
 439                        this.outgoing_contact_requests
 440                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
 441                        // Update existing incoming requests and insert new ones
 442                        for request in outgoing_requests {
 443                            match this
 444                                .outgoing_contact_requests
 445                                .binary_search_by_key(&&request.github_login, |contact| {
 446                                    &contact.github_login
 447                                }) {
 448                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
 449                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
 450                            }
 451                        }
 452
 453                        cx.notify();
 454                    })?;
 455
 456                    Ok(())
 457                })
 458            }
 459        }
 460    }
 461
 462    pub fn contacts(&self) -> &[Arc<Contact>] {
 463        &self.contacts
 464    }
 465
 466    pub fn has_contact(&self, user: &Arc<User>) -> bool {
 467        self.contacts
 468            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 469            .is_ok()
 470    }
 471
 472    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
 473        &self.incoming_contact_requests
 474    }
 475
 476    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
 477        &self.outgoing_contact_requests
 478    }
 479
 480    pub fn is_contact_request_pending(&self, user: &User) -> bool {
 481        self.pending_contact_requests.contains_key(&user.id)
 482    }
 483
 484    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
 485        if self
 486            .contacts
 487            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 488            .is_ok()
 489        {
 490            ContactRequestStatus::RequestAccepted
 491        } else if self
 492            .outgoing_contact_requests
 493            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 494            .is_ok()
 495        {
 496            ContactRequestStatus::RequestSent
 497        } else if self
 498            .incoming_contact_requests
 499            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 500            .is_ok()
 501        {
 502            ContactRequestStatus::RequestReceived
 503        } else {
 504            ContactRequestStatus::None
 505        }
 506    }
 507
 508    pub fn request_contact(
 509        &mut self,
 510        responder_id: u64,
 511        cx: &mut Context<Self>,
 512    ) -> Task<Result<()>> {
 513        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
 514    }
 515
 516    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
 517        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
 518    }
 519
 520    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
 521        self.incoming_contact_requests
 522            .iter()
 523            .any(|user| user.id == user_id)
 524    }
 525
 526    pub fn respond_to_contact_request(
 527        &mut self,
 528        requester_id: u64,
 529        accept: bool,
 530        cx: &mut Context<Self>,
 531    ) -> Task<Result<()>> {
 532        self.perform_contact_request(
 533            requester_id,
 534            proto::RespondToContactRequest {
 535                requester_id,
 536                response: if accept {
 537                    proto::ContactRequestResponse::Accept
 538                } else {
 539                    proto::ContactRequestResponse::Decline
 540                } as i32,
 541            },
 542            cx,
 543        )
 544    }
 545
 546    pub fn dismiss_contact_request(
 547        &self,
 548        requester_id: u64,
 549        cx: &Context<Self>,
 550    ) -> Task<Result<()>> {
 551        let client = self.client.upgrade();
 552        cx.spawn(async move |_, _| {
 553            client
 554                .context("can't upgrade client reference")?
 555                .request(proto::RespondToContactRequest {
 556                    requester_id,
 557                    response: proto::ContactRequestResponse::Dismiss as i32,
 558                })
 559                .await?;
 560            Ok(())
 561        })
 562    }
 563
 564    fn perform_contact_request<T: RequestMessage>(
 565        &mut self,
 566        user_id: u64,
 567        request: T,
 568        cx: &mut Context<Self>,
 569    ) -> Task<Result<()>> {
 570        let client = self.client.upgrade();
 571        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
 572        cx.notify();
 573
 574        cx.spawn(async move |this, cx| {
 575            let response = client
 576                .context("can't upgrade client reference")?
 577                .request(request)
 578                .await;
 579            this.update(cx, |this, cx| {
 580                if let Entry::Occupied(mut request_count) =
 581                    this.pending_contact_requests.entry(user_id)
 582                {
 583                    *request_count.get_mut() -= 1;
 584                    if *request_count.get() == 0 {
 585                        request_count.remove();
 586                    }
 587                }
 588                cx.notify();
 589            })?;
 590            response?;
 591            Ok(())
 592        })
 593    }
 594
 595    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
 596        let (tx, mut rx) = postage::barrier::channel();
 597        self.update_contacts_tx
 598            .unbounded_send(UpdateContacts::Clear(tx))
 599            .unwrap();
 600        async move {
 601            rx.next().await;
 602        }
 603    }
 604
 605    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
 606        let (tx, mut rx) = postage::barrier::channel();
 607        self.update_contacts_tx
 608            .unbounded_send(UpdateContacts::Wait(tx))
 609            .unwrap();
 610        async move {
 611            rx.next().await;
 612        }
 613    }
 614
 615    pub fn get_users(
 616        &self,
 617        user_ids: Vec<u64>,
 618        cx: &Context<Self>,
 619    ) -> Task<Result<Vec<Arc<User>>>> {
 620        let mut user_ids_to_fetch = user_ids.clone();
 621        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
 622
 623        cx.spawn(async move |this, cx| {
 624            if !user_ids_to_fetch.is_empty() {
 625                this.update(cx, |this, cx| {
 626                    this.load_users(
 627                        proto::GetUsers {
 628                            user_ids: user_ids_to_fetch,
 629                        },
 630                        cx,
 631                    )
 632                })?
 633                .await?;
 634            }
 635
 636            this.read_with(cx, |this, _| {
 637                user_ids
 638                    .iter()
 639                    .map(|user_id| {
 640                        this.users
 641                            .get(user_id)
 642                            .cloned()
 643                            .with_context(|| format!("user {user_id} not found"))
 644                    })
 645                    .collect()
 646            })?
 647        })
 648    }
 649
 650    pub fn fuzzy_search_users(
 651        &self,
 652        query: String,
 653        cx: &Context<Self>,
 654    ) -> Task<Result<Vec<Arc<User>>>> {
 655        self.load_users(proto::FuzzySearchUsers { query }, cx)
 656    }
 657
 658    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
 659        self.users.get(&user_id).cloned()
 660    }
 661
 662    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
 663        if let Some(user) = self.users.get(&user_id).cloned() {
 664            return Some(user);
 665        }
 666
 667        self.get_user(user_id, cx).detach_and_log_err(cx);
 668        None
 669    }
 670
 671    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
 672        if let Some(user) = self.users.get(&user_id).cloned() {
 673            return Task::ready(Ok(user));
 674        }
 675
 676        let load_users = self.get_users(vec![user_id], cx);
 677        cx.spawn(async move |this, cx| {
 678            load_users.await?;
 679            this.read_with(cx, |this, _| {
 680                this.users
 681                    .get(&user_id)
 682                    .cloned()
 683                    .context("server responded with no users")
 684            })?
 685        })
 686    }
 687
 688    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
 689        self.by_github_login
 690            .get(github_login)
 691            .and_then(|id| self.users.get(id).cloned())
 692    }
 693
 694    pub fn current_user(&self) -> Option<Arc<User>> {
 695        self.current_user.borrow().clone()
 696    }
 697
 698    pub fn plan(&self) -> Option<cloud_llm_client::Plan> {
 699        #[cfg(debug_assertions)]
 700        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
 701            return match plan.as_str() {
 702                "free" => Some(cloud_llm_client::Plan::ZedFree),
 703                "trial" => Some(cloud_llm_client::Plan::ZedProTrial),
 704                "pro" => Some(cloud_llm_client::Plan::ZedPro),
 705                _ => {
 706                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
 707                }
 708            };
 709        }
 710
 711        self.plan_info.as_ref().map(|info| info.plan)
 712    }
 713
 714    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 715        self.plan_info
 716            .as_ref()
 717            .and_then(|plan| plan.subscription_period)
 718            .map(|subscription_period| {
 719                (
 720                    subscription_period.started_at.0,
 721                    subscription_period.ended_at.0,
 722                )
 723            })
 724    }
 725
 726    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
 727        self.plan_info
 728            .as_ref()
 729            .and_then(|plan| plan.trial_started_at)
 730            .map(|trial_started_at| trial_started_at.0)
 731    }
 732
 733    /// Returns whether the user's account is too new to use the service.
 734    pub fn account_too_young(&self) -> bool {
 735        self.plan_info
 736            .as_ref()
 737            .map(|plan| plan.is_account_too_young)
 738            .unwrap_or_default()
 739    }
 740
 741    /// Returns whether the current user has overdue invoices and usage should be blocked.
 742    pub fn has_overdue_invoices(&self) -> bool {
 743        self.plan_info
 744            .as_ref()
 745            .map(|plan| plan.has_overdue_invoices)
 746            .unwrap_or_default()
 747    }
 748
 749    pub fn is_usage_based_billing_enabled(&self) -> bool {
 750        self.plan_info
 751            .as_ref()
 752            .map(|plan| plan.is_usage_based_billing_enabled)
 753            .unwrap_or_default()
 754    }
 755
 756    pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
 757        self.model_request_usage
 758    }
 759
 760    pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
 761        self.model_request_usage = Some(usage);
 762        cx.notify();
 763    }
 764
 765    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
 766        self.edit_prediction_usage
 767    }
 768
 769    pub fn update_edit_prediction_usage(
 770        &mut self,
 771        usage: EditPredictionUsage,
 772        cx: &mut Context<Self>,
 773    ) {
 774        self.edit_prediction_usage = Some(usage);
 775        cx.notify();
 776    }
 777
 778    fn update_authenticated_user(
 779        &mut self,
 780        response: GetAuthenticatedUserResponse,
 781        cx: &mut Context<Self>,
 782    ) {
 783        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
 784        cx.update_flags(staff, response.feature_flags);
 785        if let Some(client) = self.client.upgrade() {
 786            client
 787                .telemetry
 788                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
 789        }
 790
 791        let accepted_tos_at = {
 792            #[cfg(debug_assertions)]
 793            if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok() {
 794                None
 795            } else {
 796                response.user.accepted_tos_at
 797            }
 798
 799            #[cfg(not(debug_assertions))]
 800            response.user.accepted_tos_at
 801        };
 802
 803        self.accepted_tos_at = Some(accepted_tos_at);
 804        self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
 805            limit: response.plan.usage.model_requests.limit,
 806            amount: response.plan.usage.model_requests.used as i32,
 807        }));
 808        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
 809            limit: response.plan.usage.edit_predictions.limit,
 810            amount: response.plan.usage.edit_predictions.used as i32,
 811        }));
 812        self.plan_info = Some(response.plan);
 813        cx.emit(Event::PrivateUserInfoUpdated);
 814    }
 815
 816    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
 817        self.current_user.clone()
 818    }
 819
 820    pub fn has_accepted_terms_of_service(&self) -> bool {
 821        self.accepted_tos_at
 822            .map_or(false, |accepted_tos_at| accepted_tos_at.is_some())
 823    }
 824
 825    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
 826        if self.current_user().is_none() {
 827            return Task::ready(Err(anyhow!("no current user")));
 828        };
 829
 830        let client = self.client.clone();
 831        cx.spawn(async move |this, cx| -> anyhow::Result<()> {
 832            let client = client.upgrade().context("client not found")?;
 833            let response = client
 834                .cloud_client()
 835                .accept_terms_of_service()
 836                .await
 837                .context("error accepting tos")?;
 838            this.update(cx, |this, cx| {
 839                this.accepted_tos_at = Some(response.user.accepted_tos_at);
 840                cx.emit(Event::PrivateUserInfoUpdated);
 841            })?;
 842            Ok(())
 843        })
 844    }
 845
 846    fn load_users(
 847        &self,
 848        request: impl RequestMessage<Response = UsersResponse>,
 849        cx: &Context<Self>,
 850    ) -> Task<Result<Vec<Arc<User>>>> {
 851        let client = self.client.clone();
 852        cx.spawn(async move |this, cx| {
 853            if let Some(rpc) = client.upgrade() {
 854                let response = rpc.request(request).await.context("error loading users")?;
 855                let users = response.users;
 856
 857                this.update(cx, |this, _| this.insert(users))
 858            } else {
 859                Ok(Vec::new())
 860            }
 861        })
 862    }
 863
 864    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
 865        let mut ret = Vec::with_capacity(users.len());
 866        for user in users {
 867            let user = User::new(user);
 868            if let Some(old) = self.users.insert(user.id, user.clone()) {
 869                if old.github_login != user.github_login {
 870                    self.by_github_login.remove(&old.github_login);
 871                }
 872            }
 873            self.by_github_login
 874                .insert(user.github_login.clone(), user.id);
 875            ret.push(user)
 876        }
 877        ret
 878    }
 879
 880    pub fn set_participant_indices(
 881        &mut self,
 882        participant_indices: HashMap<u64, ParticipantIndex>,
 883        cx: &mut Context<Self>,
 884    ) {
 885        if participant_indices != self.participant_indices {
 886            self.participant_indices = participant_indices;
 887            cx.emit(Event::ParticipantIndicesChanged);
 888        }
 889    }
 890
 891    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
 892        &self.participant_indices
 893    }
 894
 895    pub fn participant_names(
 896        &self,
 897        user_ids: impl Iterator<Item = u64>,
 898        cx: &App,
 899    ) -> HashMap<u64, SharedString> {
 900        let mut ret = HashMap::default();
 901        let mut missing_user_ids = Vec::new();
 902        for id in user_ids {
 903            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
 904                ret.insert(id, github_login);
 905            } else {
 906                missing_user_ids.push(id)
 907            }
 908        }
 909        if !missing_user_ids.is_empty() {
 910            let this = self.weak_self.clone();
 911            cx.spawn(async move |cx| {
 912                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
 913                    .await
 914            })
 915            .detach_and_log_err(cx);
 916        }
 917        ret
 918    }
 919}
 920
 921impl User {
 922    fn new(message: proto::User) -> Arc<Self> {
 923        Arc::new(User {
 924            id: message.id,
 925            github_login: message.github_login.into(),
 926            avatar_uri: message.avatar_url.into(),
 927            name: message.name,
 928        })
 929    }
 930}
 931
 932impl Contact {
 933    async fn from_proto(
 934        contact: proto::Contact,
 935        user_store: &Entity<UserStore>,
 936        cx: &mut AsyncApp,
 937    ) -> Result<Self> {
 938        let user = user_store
 939            .update(cx, |user_store, cx| {
 940                user_store.get_user(contact.user_id, cx)
 941            })?
 942            .await?;
 943        Ok(Self {
 944            user,
 945            online: contact.online,
 946            busy: contact.busy,
 947        })
 948    }
 949}
 950
 951impl Collaborator {
 952    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
 953        Ok(Self {
 954            peer_id: message.peer_id.context("invalid peer id")?,
 955            replica_id: message.replica_id as ReplicaId,
 956            user_id: message.user_id as UserId,
 957            is_host: message.is_host,
 958            committer_name: message.committer_name,
 959            committer_email: message.committer_email,
 960        })
 961    }
 962}
 963
 964impl RequestUsage {
 965    pub fn over_limit(&self) -> bool {
 966        match self.limit {
 967            UsageLimit::Limited(limit) => self.amount >= limit,
 968            UsageLimit::Unlimited => false,
 969        }
 970    }
 971
 972    pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
 973        let limit = match limit.variant? {
 974            proto::usage_limit::Variant::Limited(limited) => {
 975                UsageLimit::Limited(limited.limit as i32)
 976            }
 977            proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
 978        };
 979        Some(RequestUsage {
 980            limit,
 981            amount: amount as i32,
 982        })
 983    }
 984
 985    fn from_headers(
 986        limit_name: &str,
 987        amount_name: &str,
 988        headers: &HeaderMap<HeaderValue>,
 989    ) -> Result<Self> {
 990        let limit = headers
 991            .get(limit_name)
 992            .with_context(|| format!("missing {limit_name:?} header"))?;
 993        let limit = UsageLimit::from_str(limit.to_str()?)?;
 994
 995        let amount = headers
 996            .get(amount_name)
 997            .with_context(|| format!("missing {amount_name:?} header"))?;
 998        let amount = amount.to_str()?.parse::<i32>()?;
 999
1000        Ok(Self { limit, amount })
1001    }
1002}
1003
1004impl ModelRequestUsage {
1005    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1006        Ok(Self(RequestUsage::from_headers(
1007            MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
1008            MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
1009            headers,
1010        )?))
1011    }
1012}
1013
1014impl EditPredictionUsage {
1015    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1016        Ok(Self(RequestUsage::from_headers(
1017            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1018            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1019            headers,
1020        )?))
1021    }
1022}