user.rs

   1use super::{Client, Status, TypedEnvelope, proto};
   2use anyhow::{Context as _, Result, anyhow};
   3use chrono::{DateTime, Utc};
   4use cloud_api_client::websocket_protocol::MessageToClient;
   5use cloud_api_client::{GetAuthenticatedUserResponse, PlanInfo};
   6use cloud_llm_client::{
   7    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
   8    MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME, MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
   9};
  10use collections::{HashMap, HashSet, hash_map::Entry};
  11use derive_more::Deref;
  12use feature_flags::FeatureFlagAppExt;
  13use futures::{Future, StreamExt, channel::mpsc};
  14use gpui::{
  15    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  16};
  17use http_client::http::{HeaderMap, HeaderValue};
  18use postage::{sink::Sink, watch};
  19use rpc::proto::{RequestMessage, UsersResponse};
  20use std::{
  21    str::FromStr as _,
  22    sync::{Arc, Weak},
  23};
  24use text::ReplicaId;
  25use util::{ResultExt, TryFutureExt as _};
  26
  27pub type UserId = u64;
  28
  29#[derive(
  30    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  31)]
  32pub struct ChannelId(pub u64);
  33
  34impl std::fmt::Display for ChannelId {
  35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  36        self.0.fmt(f)
  37    }
  38}
  39
  40#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
  41pub struct ProjectId(pub u64);
  42
  43impl ProjectId {
  44    pub fn to_proto(&self) -> u64 {
  45        self.0
  46    }
  47}
  48
  49#[derive(
  50    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  51)]
  52pub struct DevServerProjectId(pub u64);
  53
  54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  55pub struct ParticipantIndex(pub u32);
  56
  57#[derive(Default, Debug)]
  58pub struct User {
  59    pub id: UserId,
  60    pub github_login: SharedString,
  61    pub avatar_uri: SharedUri,
  62    pub name: Option<String>,
  63}
  64
  65#[derive(Clone, Debug, PartialEq, Eq)]
  66pub struct Collaborator {
  67    pub peer_id: proto::PeerId,
  68    pub replica_id: ReplicaId,
  69    pub user_id: UserId,
  70    pub is_host: bool,
  71    pub committer_name: Option<String>,
  72    pub committer_email: Option<String>,
  73}
  74
  75impl PartialOrd for User {
  76    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
  77        Some(self.cmp(other))
  78    }
  79}
  80
  81impl Ord for User {
  82    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
  83        self.github_login.cmp(&other.github_login)
  84    }
  85}
  86
  87impl PartialEq for User {
  88    fn eq(&self, other: &Self) -> bool {
  89        self.id == other.id && self.github_login == other.github_login
  90    }
  91}
  92
  93impl Eq for User {}
  94
  95#[derive(Debug, PartialEq)]
  96pub struct Contact {
  97    pub user: Arc<User>,
  98    pub online: bool,
  99    pub busy: bool,
 100}
 101
 102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 103pub enum ContactRequestStatus {
 104    None,
 105    RequestSent,
 106    RequestReceived,
 107    RequestAccepted,
 108}
 109
 110pub struct UserStore {
 111    users: HashMap<u64, Arc<User>>,
 112    by_github_login: HashMap<SharedString, u64>,
 113    participant_indices: HashMap<u64, ParticipantIndex>,
 114    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 115    model_request_usage: Option<ModelRequestUsage>,
 116    edit_prediction_usage: Option<EditPredictionUsage>,
 117    plan_info: Option<PlanInfo>,
 118    current_user: watch::Receiver<Option<Arc<User>>>,
 119    accepted_tos_at: Option<Option<cloud_api_client::Timestamp>>,
 120    contacts: Vec<Arc<Contact>>,
 121    incoming_contact_requests: Vec<Arc<User>>,
 122    outgoing_contact_requests: Vec<Arc<User>>,
 123    pending_contact_requests: HashMap<u64, usize>,
 124    invite_info: Option<InviteInfo>,
 125    client: Weak<Client>,
 126    _maintain_contacts: Task<()>,
 127    _maintain_current_user: Task<Result<()>>,
 128    weak_self: WeakEntity<Self>,
 129}
 130
 131#[derive(Clone)]
 132pub struct InviteInfo {
 133    pub count: u32,
 134    pub url: Arc<str>,
 135}
 136
 137pub enum Event {
 138    Contact {
 139        user: Arc<User>,
 140        kind: ContactEventKind,
 141    },
 142    ShowContacts,
 143    ParticipantIndicesChanged,
 144    PrivateUserInfoUpdated,
 145    PlanUpdated,
 146}
 147
 148#[derive(Clone, Copy)]
 149pub enum ContactEventKind {
 150    Requested,
 151    Accepted,
 152    Cancelled,
 153}
 154
 155impl EventEmitter<Event> for UserStore {}
 156
 157enum UpdateContacts {
 158    Update(proto::UpdateContacts),
 159    Wait(postage::barrier::Sender),
 160    Clear(postage::barrier::Sender),
 161}
 162
 163#[derive(Debug, Clone, Copy, Deref)]
 164pub struct ModelRequestUsage(pub RequestUsage);
 165
 166#[derive(Debug, Clone, Copy, Deref)]
 167pub struct EditPredictionUsage(pub RequestUsage);
 168
 169#[derive(Debug, Clone, Copy)]
 170pub struct RequestUsage {
 171    pub limit: UsageLimit,
 172    pub amount: i32,
 173}
 174
 175impl UserStore {
 176    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
 177        let (mut current_user_tx, current_user_rx) = watch::channel();
 178        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 179        let rpc_subscriptions = vec![
 180            client.add_message_handler(cx.weak_entity(), Self::handle_update_plan),
 181            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
 182            client.add_message_handler(cx.weak_entity(), Self::handle_update_invite_info),
 183            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
 184        ];
 185
 186        client.add_message_to_client_handler({
 187            let this = cx.weak_entity();
 188            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
 189        });
 190
 191        Self {
 192            users: Default::default(),
 193            by_github_login: Default::default(),
 194            current_user: current_user_rx,
 195            plan_info: None,
 196            model_request_usage: None,
 197            edit_prediction_usage: None,
 198            accepted_tos_at: None,
 199            contacts: Default::default(),
 200            incoming_contact_requests: Default::default(),
 201            participant_indices: Default::default(),
 202            outgoing_contact_requests: Default::default(),
 203            invite_info: None,
 204            client: Arc::downgrade(&client),
 205            update_contacts_tx,
 206            _maintain_contacts: cx.spawn(async move |this, cx| {
 207                let _subscriptions = rpc_subscriptions;
 208                while let Some(message) = update_contacts_rx.next().await {
 209                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
 210                    {
 211                        task.log_err().await;
 212                    } else {
 213                        break;
 214                    }
 215                }
 216            }),
 217            _maintain_current_user: cx.spawn(async move |this, cx| {
 218                let mut status = client.status();
 219                let weak = Arc::downgrade(&client);
 220                drop(client);
 221                while let Some(status) = status.next().await {
 222                    // if the client is dropped, the app is shutting down.
 223                    let Some(client) = weak.upgrade() else {
 224                        return Ok(());
 225                    };
 226                    match status {
 227                        Status::Authenticated | Status::Connected { .. } => {
 228                            if let Some(user_id) = client.user_id() {
 229                                let response = client.cloud_client().get_authenticated_user().await;
 230                                let mut current_user = None;
 231                                cx.update(|cx| {
 232                                    if let Some(response) = response.log_err() {
 233                                        let user = Arc::new(User {
 234                                            id: user_id,
 235                                            github_login: response.user.github_login.clone().into(),
 236                                            avatar_uri: response.user.avatar_url.clone().into(),
 237                                            name: response.user.name.clone(),
 238                                        });
 239                                        current_user = Some(user.clone());
 240                                        this.update(cx, |this, cx| {
 241                                            this.by_github_login
 242                                                .insert(user.github_login.clone(), user_id);
 243                                            this.users.insert(user_id, user);
 244                                            this.update_authenticated_user(response, cx)
 245                                        })
 246                                    } else {
 247                                        anyhow::Ok(())
 248                                    }
 249                                })??;
 250                                current_user_tx.send(current_user).await.ok();
 251
 252                                this.update(cx, |_, cx| cx.notify())?;
 253                            }
 254                        }
 255                        Status::SignedOut => {
 256                            current_user_tx.send(None).await.ok();
 257                            this.update(cx, |this, cx| {
 258                                this.accepted_tos_at = None;
 259                                cx.emit(Event::PrivateUserInfoUpdated);
 260                                cx.notify();
 261                                this.clear_contacts()
 262                            })?
 263                            .await;
 264                        }
 265                        Status::ConnectionLost => {
 266                            this.update(cx, |this, cx| {
 267                                cx.notify();
 268                                this.clear_contacts()
 269                            })?
 270                            .await;
 271                        }
 272                        _ => {}
 273                    }
 274                }
 275                Ok(())
 276            }),
 277            pending_contact_requests: Default::default(),
 278            weak_self: cx.weak_entity(),
 279        }
 280    }
 281
 282    #[cfg(feature = "test-support")]
 283    pub fn clear_cache(&mut self) {
 284        self.users.clear();
 285        self.by_github_login.clear();
 286    }
 287
 288    async fn handle_update_invite_info(
 289        this: Entity<Self>,
 290        message: TypedEnvelope<proto::UpdateInviteInfo>,
 291        mut cx: AsyncApp,
 292    ) -> Result<()> {
 293        this.update(&mut cx, |this, cx| {
 294            this.invite_info = Some(InviteInfo {
 295                url: Arc::from(message.payload.url),
 296                count: message.payload.count,
 297            });
 298            cx.notify();
 299        })?;
 300        Ok(())
 301    }
 302
 303    async fn handle_show_contacts(
 304        this: Entity<Self>,
 305        _: TypedEnvelope<proto::ShowContacts>,
 306        mut cx: AsyncApp,
 307    ) -> Result<()> {
 308        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts))?;
 309        Ok(())
 310    }
 311
 312    pub fn invite_info(&self) -> Option<&InviteInfo> {
 313        self.invite_info.as_ref()
 314    }
 315
 316    async fn handle_update_contacts(
 317        this: Entity<Self>,
 318        message: TypedEnvelope<proto::UpdateContacts>,
 319        mut cx: AsyncApp,
 320    ) -> Result<()> {
 321        this.read_with(&mut cx, |this, _| {
 322            this.update_contacts_tx
 323                .unbounded_send(UpdateContacts::Update(message.payload))
 324                .unwrap();
 325        })?;
 326        Ok(())
 327    }
 328
 329    async fn handle_update_plan(
 330        this: Entity<Self>,
 331        _message: TypedEnvelope<proto::UpdateUserPlan>,
 332        mut cx: AsyncApp,
 333    ) -> Result<()> {
 334        let client = this
 335            .read_with(&cx, |this, _| this.client.upgrade())?
 336            .context("client was dropped")?;
 337
 338        let response = client
 339            .cloud_client()
 340            .get_authenticated_user()
 341            .await
 342            .context("failed to fetch authenticated user")?;
 343
 344        this.update(&mut cx, |this, cx| {
 345            this.update_authenticated_user(response, cx);
 346        })
 347    }
 348
 349    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
 350        match message {
 351            UpdateContacts::Wait(barrier) => {
 352                drop(barrier);
 353                Task::ready(Ok(()))
 354            }
 355            UpdateContacts::Clear(barrier) => {
 356                self.contacts.clear();
 357                self.incoming_contact_requests.clear();
 358                self.outgoing_contact_requests.clear();
 359                drop(barrier);
 360                Task::ready(Ok(()))
 361            }
 362            UpdateContacts::Update(message) => {
 363                let mut user_ids = HashSet::default();
 364                for contact in &message.contacts {
 365                    user_ids.insert(contact.user_id);
 366                }
 367                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
 368                user_ids.extend(message.outgoing_requests.iter());
 369
 370                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
 371                cx.spawn(async move |this, cx| {
 372                    load_users.await?;
 373
 374                    // Users are fetched in parallel above and cached in call to get_users
 375                    // No need to parallelize here
 376                    let mut updated_contacts = Vec::new();
 377                    let this = this.upgrade().context("can't upgrade user store handle")?;
 378                    for contact in message.contacts {
 379                        updated_contacts
 380                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
 381                    }
 382
 383                    let mut incoming_requests = Vec::new();
 384                    for request in message.incoming_requests {
 385                        incoming_requests.push({
 386                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))?
 387                                .await?
 388                        });
 389                    }
 390
 391                    let mut outgoing_requests = Vec::new();
 392                    for requested_user_id in message.outgoing_requests {
 393                        outgoing_requests.push(
 394                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))?
 395                                .await?,
 396                        );
 397                    }
 398
 399                    let removed_contacts =
 400                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
 401                    let removed_incoming_requests =
 402                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
 403                    let removed_outgoing_requests =
 404                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
 405
 406                    this.update(cx, |this, cx| {
 407                        // Remove contacts
 408                        this.contacts
 409                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
 410                        // Update existing contacts and insert new ones
 411                        for updated_contact in updated_contacts {
 412                            match this.contacts.binary_search_by_key(
 413                                &&updated_contact.user.github_login,
 414                                |contact| &contact.user.github_login,
 415                            ) {
 416                                Ok(ix) => this.contacts[ix] = updated_contact,
 417                                Err(ix) => this.contacts.insert(ix, updated_contact),
 418                            }
 419                        }
 420
 421                        // Remove incoming contact requests
 422                        this.incoming_contact_requests.retain(|user| {
 423                            if removed_incoming_requests.contains(&user.id) {
 424                                cx.emit(Event::Contact {
 425                                    user: user.clone(),
 426                                    kind: ContactEventKind::Cancelled,
 427                                });
 428                                false
 429                            } else {
 430                                true
 431                            }
 432                        });
 433                        // Update existing incoming requests and insert new ones
 434                        for user in incoming_requests {
 435                            match this
 436                                .incoming_contact_requests
 437                                .binary_search_by_key(&&user.github_login, |contact| {
 438                                    &contact.github_login
 439                                }) {
 440                                Ok(ix) => this.incoming_contact_requests[ix] = user,
 441                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
 442                            }
 443                        }
 444
 445                        // Remove outgoing contact requests
 446                        this.outgoing_contact_requests
 447                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
 448                        // Update existing incoming requests and insert new ones
 449                        for request in outgoing_requests {
 450                            match this
 451                                .outgoing_contact_requests
 452                                .binary_search_by_key(&&request.github_login, |contact| {
 453                                    &contact.github_login
 454                                }) {
 455                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
 456                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
 457                            }
 458                        }
 459
 460                        cx.notify();
 461                    })?;
 462
 463                    Ok(())
 464                })
 465            }
 466        }
 467    }
 468
 469    pub fn contacts(&self) -> &[Arc<Contact>] {
 470        &self.contacts
 471    }
 472
 473    pub fn has_contact(&self, user: &Arc<User>) -> bool {
 474        self.contacts
 475            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 476            .is_ok()
 477    }
 478
 479    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
 480        &self.incoming_contact_requests
 481    }
 482
 483    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
 484        &self.outgoing_contact_requests
 485    }
 486
 487    pub fn is_contact_request_pending(&self, user: &User) -> bool {
 488        self.pending_contact_requests.contains_key(&user.id)
 489    }
 490
 491    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
 492        if self
 493            .contacts
 494            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 495            .is_ok()
 496        {
 497            ContactRequestStatus::RequestAccepted
 498        } else if self
 499            .outgoing_contact_requests
 500            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 501            .is_ok()
 502        {
 503            ContactRequestStatus::RequestSent
 504        } else if self
 505            .incoming_contact_requests
 506            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 507            .is_ok()
 508        {
 509            ContactRequestStatus::RequestReceived
 510        } else {
 511            ContactRequestStatus::None
 512        }
 513    }
 514
 515    pub fn request_contact(
 516        &mut self,
 517        responder_id: u64,
 518        cx: &mut Context<Self>,
 519    ) -> Task<Result<()>> {
 520        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
 521    }
 522
 523    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
 524        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
 525    }
 526
 527    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
 528        self.incoming_contact_requests
 529            .iter()
 530            .any(|user| user.id == user_id)
 531    }
 532
 533    pub fn respond_to_contact_request(
 534        &mut self,
 535        requester_id: u64,
 536        accept: bool,
 537        cx: &mut Context<Self>,
 538    ) -> Task<Result<()>> {
 539        self.perform_contact_request(
 540            requester_id,
 541            proto::RespondToContactRequest {
 542                requester_id,
 543                response: if accept {
 544                    proto::ContactRequestResponse::Accept
 545                } else {
 546                    proto::ContactRequestResponse::Decline
 547                } as i32,
 548            },
 549            cx,
 550        )
 551    }
 552
 553    pub fn dismiss_contact_request(
 554        &self,
 555        requester_id: u64,
 556        cx: &Context<Self>,
 557    ) -> Task<Result<()>> {
 558        let client = self.client.upgrade();
 559        cx.spawn(async move |_, _| {
 560            client
 561                .context("can't upgrade client reference")?
 562                .request(proto::RespondToContactRequest {
 563                    requester_id,
 564                    response: proto::ContactRequestResponse::Dismiss as i32,
 565                })
 566                .await?;
 567            Ok(())
 568        })
 569    }
 570
 571    fn perform_contact_request<T: RequestMessage>(
 572        &mut self,
 573        user_id: u64,
 574        request: T,
 575        cx: &mut Context<Self>,
 576    ) -> Task<Result<()>> {
 577        let client = self.client.upgrade();
 578        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
 579        cx.notify();
 580
 581        cx.spawn(async move |this, cx| {
 582            let response = client
 583                .context("can't upgrade client reference")?
 584                .request(request)
 585                .await;
 586            this.update(cx, |this, cx| {
 587                if let Entry::Occupied(mut request_count) =
 588                    this.pending_contact_requests.entry(user_id)
 589                {
 590                    *request_count.get_mut() -= 1;
 591                    if *request_count.get() == 0 {
 592                        request_count.remove();
 593                    }
 594                }
 595                cx.notify();
 596            })?;
 597            response?;
 598            Ok(())
 599        })
 600    }
 601
 602    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
 603        let (tx, mut rx) = postage::barrier::channel();
 604        self.update_contacts_tx
 605            .unbounded_send(UpdateContacts::Clear(tx))
 606            .unwrap();
 607        async move {
 608            rx.next().await;
 609        }
 610    }
 611
 612    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
 613        let (tx, mut rx) = postage::barrier::channel();
 614        self.update_contacts_tx
 615            .unbounded_send(UpdateContacts::Wait(tx))
 616            .unwrap();
 617        async move {
 618            rx.next().await;
 619        }
 620    }
 621
 622    pub fn get_users(
 623        &self,
 624        user_ids: Vec<u64>,
 625        cx: &Context<Self>,
 626    ) -> Task<Result<Vec<Arc<User>>>> {
 627        let mut user_ids_to_fetch = user_ids.clone();
 628        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
 629
 630        cx.spawn(async move |this, cx| {
 631            if !user_ids_to_fetch.is_empty() {
 632                this.update(cx, |this, cx| {
 633                    this.load_users(
 634                        proto::GetUsers {
 635                            user_ids: user_ids_to_fetch,
 636                        },
 637                        cx,
 638                    )
 639                })?
 640                .await?;
 641            }
 642
 643            this.read_with(cx, |this, _| {
 644                user_ids
 645                    .iter()
 646                    .map(|user_id| {
 647                        this.users
 648                            .get(user_id)
 649                            .cloned()
 650                            .with_context(|| format!("user {user_id} not found"))
 651                    })
 652                    .collect()
 653            })?
 654        })
 655    }
 656
 657    pub fn fuzzy_search_users(
 658        &self,
 659        query: String,
 660        cx: &Context<Self>,
 661    ) -> Task<Result<Vec<Arc<User>>>> {
 662        self.load_users(proto::FuzzySearchUsers { query }, cx)
 663    }
 664
 665    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
 666        self.users.get(&user_id).cloned()
 667    }
 668
 669    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
 670        if let Some(user) = self.users.get(&user_id).cloned() {
 671            return Some(user);
 672        }
 673
 674        self.get_user(user_id, cx).detach_and_log_err(cx);
 675        None
 676    }
 677
 678    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
 679        if let Some(user) = self.users.get(&user_id).cloned() {
 680            return Task::ready(Ok(user));
 681        }
 682
 683        let load_users = self.get_users(vec![user_id], cx);
 684        cx.spawn(async move |this, cx| {
 685            load_users.await?;
 686            this.read_with(cx, |this, _| {
 687                this.users
 688                    .get(&user_id)
 689                    .cloned()
 690                    .context("server responded with no users")
 691            })?
 692        })
 693    }
 694
 695    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
 696        self.by_github_login
 697            .get(github_login)
 698            .and_then(|id| self.users.get(id).cloned())
 699    }
 700
 701    pub fn current_user(&self) -> Option<Arc<User>> {
 702        self.current_user.borrow().clone()
 703    }
 704
 705    pub fn plan(&self) -> Option<cloud_llm_client::Plan> {
 706        #[cfg(debug_assertions)]
 707        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
 708            return match plan.as_str() {
 709                "free" => Some(cloud_llm_client::Plan::ZedFree),
 710                "trial" => Some(cloud_llm_client::Plan::ZedProTrial),
 711                "pro" => Some(cloud_llm_client::Plan::ZedPro),
 712                _ => {
 713                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
 714                }
 715            };
 716        }
 717
 718        self.plan_info.as_ref().map(|info| info.plan)
 719    }
 720
 721    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 722        self.plan_info
 723            .as_ref()
 724            .and_then(|plan| plan.subscription_period)
 725            .map(|subscription_period| {
 726                (
 727                    subscription_period.started_at.0,
 728                    subscription_period.ended_at.0,
 729                )
 730            })
 731    }
 732
 733    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
 734        self.plan_info
 735            .as_ref()
 736            .and_then(|plan| plan.trial_started_at)
 737            .map(|trial_started_at| trial_started_at.0)
 738    }
 739
 740    /// Returns whether the user's account is too new to use the service.
 741    pub fn account_too_young(&self) -> bool {
 742        self.plan_info
 743            .as_ref()
 744            .map(|plan| plan.is_account_too_young)
 745            .unwrap_or_default()
 746    }
 747
 748    /// Returns whether the current user has overdue invoices and usage should be blocked.
 749    pub fn has_overdue_invoices(&self) -> bool {
 750        self.plan_info
 751            .as_ref()
 752            .map(|plan| plan.has_overdue_invoices)
 753            .unwrap_or_default()
 754    }
 755
 756    pub fn is_usage_based_billing_enabled(&self) -> bool {
 757        self.plan_info
 758            .as_ref()
 759            .map(|plan| plan.is_usage_based_billing_enabled)
 760            .unwrap_or_default()
 761    }
 762
 763    pub fn model_request_usage(&self) -> Option<ModelRequestUsage> {
 764        self.model_request_usage
 765    }
 766
 767    pub fn update_model_request_usage(&mut self, usage: ModelRequestUsage, cx: &mut Context<Self>) {
 768        self.model_request_usage = Some(usage);
 769        cx.notify();
 770    }
 771
 772    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
 773        self.edit_prediction_usage
 774    }
 775
 776    pub fn update_edit_prediction_usage(
 777        &mut self,
 778        usage: EditPredictionUsage,
 779        cx: &mut Context<Self>,
 780    ) {
 781        self.edit_prediction_usage = Some(usage);
 782        cx.notify();
 783    }
 784
 785    fn update_authenticated_user(
 786        &mut self,
 787        response: GetAuthenticatedUserResponse,
 788        cx: &mut Context<Self>,
 789    ) {
 790        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
 791        cx.update_flags(staff, response.feature_flags);
 792        if let Some(client) = self.client.upgrade() {
 793            client
 794                .telemetry
 795                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
 796        }
 797
 798        let accepted_tos_at = {
 799            #[cfg(debug_assertions)]
 800            if std::env::var("ZED_IGNORE_ACCEPTED_TOS").is_ok() {
 801                None
 802            } else {
 803                response.user.accepted_tos_at
 804            }
 805
 806            #[cfg(not(debug_assertions))]
 807            response.user.accepted_tos_at
 808        };
 809
 810        self.accepted_tos_at = Some(accepted_tos_at);
 811        self.model_request_usage = Some(ModelRequestUsage(RequestUsage {
 812            limit: response.plan.usage.model_requests.limit,
 813            amount: response.plan.usage.model_requests.used as i32,
 814        }));
 815        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
 816            limit: response.plan.usage.edit_predictions.limit,
 817            amount: response.plan.usage.edit_predictions.used as i32,
 818        }));
 819        self.plan_info = Some(response.plan);
 820        cx.emit(Event::PrivateUserInfoUpdated);
 821    }
 822
 823    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
 824        cx.spawn(async move |cx| {
 825            match message {
 826                MessageToClient::UserUpdated => {
 827                    let cloud_client = cx
 828                        .update(|cx| {
 829                            this.read_with(cx, |this, _cx| {
 830                                this.client.upgrade().map(|client| client.cloud_client())
 831                            })
 832                        })??
 833                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
 834
 835                    let response = cloud_client.get_authenticated_user().await?;
 836                    cx.update(|cx| {
 837                        this.update(cx, |this, cx| {
 838                            this.update_authenticated_user(response, cx);
 839                        })
 840                    })??;
 841                }
 842            }
 843
 844            anyhow::Ok(())
 845        })
 846        .detach_and_log_err(cx);
 847    }
 848
 849    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
 850        self.current_user.clone()
 851    }
 852
 853    pub fn has_accepted_terms_of_service(&self) -> bool {
 854        self.accepted_tos_at
 855            .map_or(false, |accepted_tos_at| accepted_tos_at.is_some())
 856    }
 857
 858    pub fn accept_terms_of_service(&self, cx: &Context<Self>) -> Task<Result<()>> {
 859        if self.current_user().is_none() {
 860            return Task::ready(Err(anyhow!("no current user")));
 861        };
 862
 863        let client = self.client.clone();
 864        cx.spawn(async move |this, cx| -> anyhow::Result<()> {
 865            let client = client.upgrade().context("client not found")?;
 866            let response = client
 867                .cloud_client()
 868                .accept_terms_of_service()
 869                .await
 870                .context("error accepting tos")?;
 871            this.update(cx, |this, cx| {
 872                this.accepted_tos_at = Some(response.user.accepted_tos_at);
 873                cx.emit(Event::PrivateUserInfoUpdated);
 874            })?;
 875            Ok(())
 876        })
 877    }
 878
 879    fn load_users(
 880        &self,
 881        request: impl RequestMessage<Response = UsersResponse>,
 882        cx: &Context<Self>,
 883    ) -> Task<Result<Vec<Arc<User>>>> {
 884        let client = self.client.clone();
 885        cx.spawn(async move |this, cx| {
 886            if let Some(rpc) = client.upgrade() {
 887                let response = rpc.request(request).await.context("error loading users")?;
 888                let users = response.users;
 889
 890                this.update(cx, |this, _| this.insert(users))
 891            } else {
 892                Ok(Vec::new())
 893            }
 894        })
 895    }
 896
 897    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
 898        let mut ret = Vec::with_capacity(users.len());
 899        for user in users {
 900            let user = User::new(user);
 901            if let Some(old) = self.users.insert(user.id, user.clone()) {
 902                if old.github_login != user.github_login {
 903                    self.by_github_login.remove(&old.github_login);
 904                }
 905            }
 906            self.by_github_login
 907                .insert(user.github_login.clone(), user.id);
 908            ret.push(user)
 909        }
 910        ret
 911    }
 912
 913    pub fn set_participant_indices(
 914        &mut self,
 915        participant_indices: HashMap<u64, ParticipantIndex>,
 916        cx: &mut Context<Self>,
 917    ) {
 918        if participant_indices != self.participant_indices {
 919            self.participant_indices = participant_indices;
 920            cx.emit(Event::ParticipantIndicesChanged);
 921        }
 922    }
 923
 924    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
 925        &self.participant_indices
 926    }
 927
 928    pub fn participant_names(
 929        &self,
 930        user_ids: impl Iterator<Item = u64>,
 931        cx: &App,
 932    ) -> HashMap<u64, SharedString> {
 933        let mut ret = HashMap::default();
 934        let mut missing_user_ids = Vec::new();
 935        for id in user_ids {
 936            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
 937                ret.insert(id, github_login);
 938            } else {
 939                missing_user_ids.push(id)
 940            }
 941        }
 942        if !missing_user_ids.is_empty() {
 943            let this = self.weak_self.clone();
 944            cx.spawn(async move |cx| {
 945                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
 946                    .await
 947            })
 948            .detach_and_log_err(cx);
 949        }
 950        ret
 951    }
 952}
 953
 954impl User {
 955    fn new(message: proto::User) -> Arc<Self> {
 956        Arc::new(User {
 957            id: message.id,
 958            github_login: message.github_login.into(),
 959            avatar_uri: message.avatar_url.into(),
 960            name: message.name,
 961        })
 962    }
 963}
 964
 965impl Contact {
 966    async fn from_proto(
 967        contact: proto::Contact,
 968        user_store: &Entity<UserStore>,
 969        cx: &mut AsyncApp,
 970    ) -> Result<Self> {
 971        let user = user_store
 972            .update(cx, |user_store, cx| {
 973                user_store.get_user(contact.user_id, cx)
 974            })?
 975            .await?;
 976        Ok(Self {
 977            user,
 978            online: contact.online,
 979            busy: contact.busy,
 980        })
 981    }
 982}
 983
 984impl Collaborator {
 985    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
 986        Ok(Self {
 987            peer_id: message.peer_id.context("invalid peer id")?,
 988            replica_id: message.replica_id as ReplicaId,
 989            user_id: message.user_id as UserId,
 990            is_host: message.is_host,
 991            committer_name: message.committer_name,
 992            committer_email: message.committer_email,
 993        })
 994    }
 995}
 996
 997impl RequestUsage {
 998    pub fn over_limit(&self) -> bool {
 999        match self.limit {
1000            UsageLimit::Limited(limit) => self.amount >= limit,
1001            UsageLimit::Unlimited => false,
1002        }
1003    }
1004
1005    pub fn from_proto(amount: u32, limit: proto::UsageLimit) -> Option<Self> {
1006        let limit = match limit.variant? {
1007            proto::usage_limit::Variant::Limited(limited) => {
1008                UsageLimit::Limited(limited.limit as i32)
1009            }
1010            proto::usage_limit::Variant::Unlimited(_) => UsageLimit::Unlimited,
1011        };
1012        Some(RequestUsage {
1013            limit,
1014            amount: amount as i32,
1015        })
1016    }
1017
1018    fn from_headers(
1019        limit_name: &str,
1020        amount_name: &str,
1021        headers: &HeaderMap<HeaderValue>,
1022    ) -> Result<Self> {
1023        let limit = headers
1024            .get(limit_name)
1025            .with_context(|| format!("missing {limit_name:?} header"))?;
1026        let limit = UsageLimit::from_str(limit.to_str()?)?;
1027
1028        let amount = headers
1029            .get(amount_name)
1030            .with_context(|| format!("missing {amount_name:?} header"))?;
1031        let amount = amount.to_str()?.parse::<i32>()?;
1032
1033        Ok(Self { limit, amount })
1034    }
1035}
1036
1037impl ModelRequestUsage {
1038    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1039        Ok(Self(RequestUsage::from_headers(
1040            MODEL_REQUESTS_USAGE_LIMIT_HEADER_NAME,
1041            MODEL_REQUESTS_USAGE_AMOUNT_HEADER_NAME,
1042            headers,
1043        )?))
1044    }
1045}
1046
1047impl EditPredictionUsage {
1048    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1049        Ok(Self(RequestUsage::from_headers(
1050            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1051            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1052            headers,
1053        )?))
1054    }
1055}