user.rs

   1use super::{Client, Status, TypedEnvelope, proto};
   2use anyhow::{Context as _, Result};
   3use chrono::{DateTime, Utc};
   4use cloud_api_client::websocket_protocol::MessageToClient;
   5use cloud_api_client::{
   6    GetAuthenticatedUserResponse, Organization, OrganizationId, Plan, PlanInfo,
   7};
   8use cloud_llm_client::{
   9    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
  10};
  11use collections::{HashMap, HashSet, hash_map::Entry};
  12use derive_more::Deref;
  13use feature_flags::FeatureFlagAppExt;
  14use futures::{Future, StreamExt, channel::mpsc};
  15use gpui::{
  16    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  17};
  18use http_client::http::{HeaderMap, HeaderValue};
  19use postage::{sink::Sink, watch};
  20use rpc::proto::{RequestMessage, UsersResponse};
  21use std::{
  22    str::FromStr as _,
  23    sync::{Arc, Weak},
  24};
  25use text::ReplicaId;
  26use util::{ResultExt, TryFutureExt as _};
  27
  28pub type UserId = u64;
  29
  30#[derive(
  31    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  32)]
  33pub struct ChannelId(pub u64);
  34
  35impl std::fmt::Display for ChannelId {
  36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  37        self.0.fmt(f)
  38    }
  39}
  40
  41#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
  42pub struct ProjectId(pub u64);
  43
  44impl ProjectId {
  45    pub fn to_proto(self) -> u64 {
  46        self.0
  47    }
  48}
  49
  50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  51pub struct ParticipantIndex(pub u32);
  52
  53#[derive(Default, Debug)]
  54pub struct User {
  55    pub id: UserId,
  56    pub github_login: SharedString,
  57    pub avatar_uri: SharedUri,
  58    pub name: Option<String>,
  59}
  60
  61#[derive(Clone, Debug, PartialEq, Eq)]
  62pub struct Collaborator {
  63    pub peer_id: proto::PeerId,
  64    pub replica_id: ReplicaId,
  65    pub user_id: UserId,
  66    pub is_host: bool,
  67    pub committer_name: Option<String>,
  68    pub committer_email: Option<String>,
  69}
  70
  71impl PartialOrd for User {
  72    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
  73        Some(self.cmp(other))
  74    }
  75}
  76
  77impl Ord for User {
  78    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
  79        self.github_login.cmp(&other.github_login)
  80    }
  81}
  82
  83impl PartialEq for User {
  84    fn eq(&self, other: &Self) -> bool {
  85        self.id == other.id && self.github_login == other.github_login
  86    }
  87}
  88
  89impl Eq for User {}
  90
  91#[derive(Debug, PartialEq)]
  92pub struct Contact {
  93    pub user: Arc<User>,
  94    pub online: bool,
  95    pub busy: bool,
  96}
  97
  98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  99pub enum ContactRequestStatus {
 100    None,
 101    RequestSent,
 102    RequestReceived,
 103    RequestAccepted,
 104}
 105
 106pub struct UserStore {
 107    users: HashMap<u64, Arc<User>>,
 108    by_github_login: HashMap<SharedString, u64>,
 109    participant_indices: HashMap<u64, ParticipantIndex>,
 110    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 111    edit_prediction_usage: Option<EditPredictionUsage>,
 112    plan_info: Option<PlanInfo>,
 113    current_user: watch::Receiver<Option<Arc<User>>>,
 114    current_organization: Option<Arc<Organization>>,
 115    organizations: Vec<Arc<Organization>>,
 116    plans_by_organization: HashMap<OrganizationId, Plan>,
 117    contacts: Vec<Arc<Contact>>,
 118    incoming_contact_requests: Vec<Arc<User>>,
 119    outgoing_contact_requests: Vec<Arc<User>>,
 120    pending_contact_requests: HashMap<u64, usize>,
 121    client: Weak<Client>,
 122    _maintain_contacts: Task<()>,
 123    _maintain_current_user: Task<Result<()>>,
 124    _handle_sign_out: Task<()>,
 125    weak_self: WeakEntity<Self>,
 126}
 127
 128#[derive(Clone)]
 129pub struct InviteInfo {
 130    pub count: u32,
 131    pub url: Arc<str>,
 132}
 133
 134pub enum Event {
 135    Contact {
 136        user: Arc<User>,
 137        kind: ContactEventKind,
 138    },
 139    ShowContacts,
 140    ParticipantIndicesChanged,
 141    PrivateUserInfoUpdated,
 142    PlanUpdated,
 143    OrganizationChanged,
 144}
 145
 146#[derive(Clone, Copy)]
 147pub enum ContactEventKind {
 148    Requested,
 149    Accepted,
 150    Cancelled,
 151}
 152
 153impl EventEmitter<Event> for UserStore {}
 154
 155enum UpdateContacts {
 156    Update(proto::UpdateContacts),
 157    Wait(postage::barrier::Sender),
 158    Clear(postage::barrier::Sender),
 159}
 160
 161#[derive(Debug, Clone, Copy, Deref)]
 162pub struct EditPredictionUsage(pub RequestUsage);
 163
 164#[derive(Debug, Clone, Copy)]
 165pub struct RequestUsage {
 166    pub limit: UsageLimit,
 167    pub amount: i32,
 168}
 169
 170impl UserStore {
 171    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
 172        let (mut current_user_tx, current_user_rx) = watch::channel();
 173        let (sign_out_tx, mut sign_out_rx) = mpsc::unbounded();
 174        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 175        let rpc_subscriptions = vec![
 176            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
 177            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
 178        ];
 179
 180        client.sign_out_tx.lock().replace(sign_out_tx);
 181        client.add_message_to_client_handler({
 182            let this = cx.weak_entity();
 183            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
 184        });
 185
 186        Self {
 187            users: Default::default(),
 188            by_github_login: Default::default(),
 189            current_user: current_user_rx,
 190            current_organization: None,
 191            organizations: Vec::new(),
 192            plans_by_organization: HashMap::default(),
 193            plan_info: None,
 194            edit_prediction_usage: None,
 195            contacts: Default::default(),
 196            incoming_contact_requests: Default::default(),
 197            participant_indices: Default::default(),
 198            outgoing_contact_requests: Default::default(),
 199            client: Arc::downgrade(&client),
 200            update_contacts_tx,
 201            _maintain_contacts: cx.spawn(async move |this, cx| {
 202                let _subscriptions = rpc_subscriptions;
 203                while let Some(message) = update_contacts_rx.next().await {
 204                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
 205                    {
 206                        task.log_err().await;
 207                    } else {
 208                        break;
 209                    }
 210                }
 211            }),
 212            _maintain_current_user: cx.spawn(async move |this, cx| {
 213                let mut status = client.status();
 214                let weak = Arc::downgrade(&client);
 215                drop(client);
 216                while let Some(status) = status.next().await {
 217                    // if the client is dropped, the app is shutting down.
 218                    let Some(client) = weak.upgrade() else {
 219                        return Ok(());
 220                    };
 221                    match status {
 222                        Status::Authenticated
 223                        | Status::Reauthenticated
 224                        | Status::Connected { .. } => {
 225                            if let Some(user_id) = client.user_id() {
 226                                let response = client
 227                                    .cloud_client()
 228                                    .get_authenticated_user()
 229                                    .await
 230                                    .log_err();
 231
 232                                let current_user_and_response = if let Some(response) = response {
 233                                    let user = Arc::new(User {
 234                                        id: user_id,
 235                                        github_login: response.user.github_login.clone().into(),
 236                                        avatar_uri: response.user.avatar_url.clone().into(),
 237                                        name: response.user.name.clone(),
 238                                    });
 239
 240                                    Some((user, response))
 241                                } else {
 242                                    None
 243                                };
 244                                current_user_tx
 245                                    .send(
 246                                        current_user_and_response
 247                                            .as_ref()
 248                                            .map(|(user, _)| user.clone()),
 249                                    )
 250                                    .await
 251                                    .ok();
 252
 253                                cx.update(|cx| {
 254                                    if let Some((user, response)) = current_user_and_response {
 255                                        this.update(cx, |this, cx| {
 256                                            this.by_github_login
 257                                                .insert(user.github_login.clone(), user_id);
 258                                            this.users.insert(user_id, user);
 259                                            this.update_authenticated_user(response, cx)
 260                                        })
 261                                    } else {
 262                                        anyhow::Ok(())
 263                                    }
 264                                })?;
 265
 266                                this.update(cx, |_, cx| cx.notify())?;
 267                            }
 268                        }
 269                        Status::SignedOut => {
 270                            current_user_tx.send(None).await.ok();
 271                            this.update(cx, |this, cx| {
 272                                this.clear_organizations();
 273                                this.clear_plan_and_usage();
 274                                cx.emit(Event::PrivateUserInfoUpdated);
 275                                cx.notify();
 276                                this.clear_contacts()
 277                            })?
 278                            .await;
 279                        }
 280                        Status::ConnectionLost => {
 281                            this.update(cx, |this, cx| {
 282                                cx.notify();
 283                                this.clear_contacts()
 284                            })?
 285                            .await;
 286                        }
 287                        _ => {}
 288                    }
 289                }
 290                Ok(())
 291            }),
 292            _handle_sign_out: cx.spawn(async move |this, cx| {
 293                while let Some(()) = sign_out_rx.next().await {
 294                    let Some(client) = this
 295                        .read_with(cx, |this, _cx| this.client.upgrade())
 296                        .ok()
 297                        .flatten()
 298                    else {
 299                        break;
 300                    };
 301
 302                    client.sign_out(cx).await;
 303                }
 304            }),
 305            pending_contact_requests: Default::default(),
 306            weak_self: cx.weak_entity(),
 307        }
 308    }
 309
 310    #[cfg(feature = "test-support")]
 311    pub fn clear_cache(&mut self) {
 312        self.users.clear();
 313        self.by_github_login.clear();
 314    }
 315
 316    async fn handle_show_contacts(
 317        this: Entity<Self>,
 318        _: TypedEnvelope<proto::ShowContacts>,
 319        mut cx: AsyncApp,
 320    ) -> Result<()> {
 321        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
 322        Ok(())
 323    }
 324
 325    async fn handle_update_contacts(
 326        this: Entity<Self>,
 327        message: TypedEnvelope<proto::UpdateContacts>,
 328        cx: AsyncApp,
 329    ) -> Result<()> {
 330        this.read_with(&cx, |this, _| {
 331            this.update_contacts_tx
 332                .unbounded_send(UpdateContacts::Update(message.payload))
 333                .unwrap();
 334        });
 335        Ok(())
 336    }
 337
 338    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
 339        match message {
 340            UpdateContacts::Wait(barrier) => {
 341                drop(barrier);
 342                Task::ready(Ok(()))
 343            }
 344            UpdateContacts::Clear(barrier) => {
 345                self.contacts.clear();
 346                self.incoming_contact_requests.clear();
 347                self.outgoing_contact_requests.clear();
 348                drop(barrier);
 349                Task::ready(Ok(()))
 350            }
 351            UpdateContacts::Update(message) => {
 352                let mut user_ids = HashSet::default();
 353                for contact in &message.contacts {
 354                    user_ids.insert(contact.user_id);
 355                }
 356                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
 357                user_ids.extend(message.outgoing_requests.iter());
 358
 359                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
 360                cx.spawn(async move |this, cx| {
 361                    load_users.await?;
 362
 363                    // Users are fetched in parallel above and cached in call to get_users
 364                    // No need to parallelize here
 365                    let mut updated_contacts = Vec::new();
 366                    let this = this.upgrade().context("can't upgrade user store handle")?;
 367                    for contact in message.contacts {
 368                        updated_contacts
 369                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
 370                    }
 371
 372                    let mut incoming_requests = Vec::new();
 373                    for request in message.incoming_requests {
 374                        incoming_requests.push({
 375                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
 376                                .await?
 377                        });
 378                    }
 379
 380                    let mut outgoing_requests = Vec::new();
 381                    for requested_user_id in message.outgoing_requests {
 382                        outgoing_requests.push(
 383                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
 384                                .await?,
 385                        );
 386                    }
 387
 388                    let removed_contacts =
 389                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
 390                    let removed_incoming_requests =
 391                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
 392                    let removed_outgoing_requests =
 393                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
 394
 395                    this.update(cx, |this, cx| {
 396                        // Remove contacts
 397                        this.contacts
 398                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
 399                        // Update existing contacts and insert new ones
 400                        for updated_contact in updated_contacts {
 401                            match this.contacts.binary_search_by_key(
 402                                &&updated_contact.user.github_login,
 403                                |contact| &contact.user.github_login,
 404                            ) {
 405                                Ok(ix) => this.contacts[ix] = updated_contact,
 406                                Err(ix) => this.contacts.insert(ix, updated_contact),
 407                            }
 408                        }
 409
 410                        // Remove incoming contact requests
 411                        this.incoming_contact_requests.retain(|user| {
 412                            if removed_incoming_requests.contains(&user.id) {
 413                                cx.emit(Event::Contact {
 414                                    user: user.clone(),
 415                                    kind: ContactEventKind::Cancelled,
 416                                });
 417                                false
 418                            } else {
 419                                true
 420                            }
 421                        });
 422                        // Update existing incoming requests and insert new ones
 423                        for user in incoming_requests {
 424                            match this
 425                                .incoming_contact_requests
 426                                .binary_search_by_key(&&user.github_login, |contact| {
 427                                    &contact.github_login
 428                                }) {
 429                                Ok(ix) => this.incoming_contact_requests[ix] = user,
 430                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
 431                            }
 432                        }
 433
 434                        // Remove outgoing contact requests
 435                        this.outgoing_contact_requests
 436                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
 437                        // Update existing incoming requests and insert new ones
 438                        for request in outgoing_requests {
 439                            match this
 440                                .outgoing_contact_requests
 441                                .binary_search_by_key(&&request.github_login, |contact| {
 442                                    &contact.github_login
 443                                }) {
 444                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
 445                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
 446                            }
 447                        }
 448
 449                        cx.notify();
 450                    });
 451
 452                    Ok(())
 453                })
 454            }
 455        }
 456    }
 457
 458    pub fn contacts(&self) -> &[Arc<Contact>] {
 459        &self.contacts
 460    }
 461
 462    pub fn has_contact(&self, user: &Arc<User>) -> bool {
 463        self.contacts
 464            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 465            .is_ok()
 466    }
 467
 468    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
 469        &self.incoming_contact_requests
 470    }
 471
 472    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
 473        &self.outgoing_contact_requests
 474    }
 475
 476    pub fn is_contact_request_pending(&self, user: &User) -> bool {
 477        self.pending_contact_requests.contains_key(&user.id)
 478    }
 479
 480    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
 481        if self
 482            .contacts
 483            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 484            .is_ok()
 485        {
 486            ContactRequestStatus::RequestAccepted
 487        } else if self
 488            .outgoing_contact_requests
 489            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 490            .is_ok()
 491        {
 492            ContactRequestStatus::RequestSent
 493        } else if self
 494            .incoming_contact_requests
 495            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 496            .is_ok()
 497        {
 498            ContactRequestStatus::RequestReceived
 499        } else {
 500            ContactRequestStatus::None
 501        }
 502    }
 503
 504    pub fn request_contact(
 505        &mut self,
 506        responder_id: u64,
 507        cx: &mut Context<Self>,
 508    ) -> Task<Result<()>> {
 509        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
 510    }
 511
 512    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
 513        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
 514    }
 515
 516    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
 517        self.incoming_contact_requests
 518            .iter()
 519            .any(|user| user.id == user_id)
 520    }
 521
 522    pub fn respond_to_contact_request(
 523        &mut self,
 524        requester_id: u64,
 525        accept: bool,
 526        cx: &mut Context<Self>,
 527    ) -> Task<Result<()>> {
 528        self.perform_contact_request(
 529            requester_id,
 530            proto::RespondToContactRequest {
 531                requester_id,
 532                response: if accept {
 533                    proto::ContactRequestResponse::Accept
 534                } else {
 535                    proto::ContactRequestResponse::Decline
 536                } as i32,
 537            },
 538            cx,
 539        )
 540    }
 541
 542    pub fn dismiss_contact_request(
 543        &self,
 544        requester_id: u64,
 545        cx: &Context<Self>,
 546    ) -> Task<Result<()>> {
 547        let client = self.client.upgrade();
 548        cx.spawn(async move |_, _| {
 549            client
 550                .context("can't upgrade client reference")?
 551                .request(proto::RespondToContactRequest {
 552                    requester_id,
 553                    response: proto::ContactRequestResponse::Dismiss as i32,
 554                })
 555                .await?;
 556            Ok(())
 557        })
 558    }
 559
 560    fn perform_contact_request<T: RequestMessage>(
 561        &mut self,
 562        user_id: u64,
 563        request: T,
 564        cx: &mut Context<Self>,
 565    ) -> Task<Result<()>> {
 566        let client = self.client.upgrade();
 567        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
 568        cx.notify();
 569
 570        cx.spawn(async move |this, cx| {
 571            let response = client
 572                .context("can't upgrade client reference")?
 573                .request(request)
 574                .await;
 575            this.update(cx, |this, cx| {
 576                if let Entry::Occupied(mut request_count) =
 577                    this.pending_contact_requests.entry(user_id)
 578                {
 579                    *request_count.get_mut() -= 1;
 580                    if *request_count.get() == 0 {
 581                        request_count.remove();
 582                    }
 583                }
 584                cx.notify();
 585            })?;
 586            response?;
 587            Ok(())
 588        })
 589    }
 590
 591    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
 592        let (tx, mut rx) = postage::barrier::channel();
 593        self.update_contacts_tx
 594            .unbounded_send(UpdateContacts::Clear(tx))
 595            .unwrap();
 596        async move {
 597            rx.next().await;
 598        }
 599    }
 600
 601    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
 602        let (tx, mut rx) = postage::barrier::channel();
 603        self.update_contacts_tx
 604            .unbounded_send(UpdateContacts::Wait(tx))
 605            .unwrap();
 606        async move {
 607            rx.next().await;
 608        }
 609    }
 610
 611    pub fn get_users(
 612        &self,
 613        user_ids: Vec<u64>,
 614        cx: &Context<Self>,
 615    ) -> Task<Result<Vec<Arc<User>>>> {
 616        let mut user_ids_to_fetch = user_ids.clone();
 617        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
 618
 619        cx.spawn(async move |this, cx| {
 620            if !user_ids_to_fetch.is_empty() {
 621                this.update(cx, |this, cx| {
 622                    this.load_users(
 623                        proto::GetUsers {
 624                            user_ids: user_ids_to_fetch,
 625                        },
 626                        cx,
 627                    )
 628                })?
 629                .await?;
 630            }
 631
 632            this.read_with(cx, |this, _| {
 633                user_ids
 634                    .iter()
 635                    .map(|user_id| {
 636                        this.users
 637                            .get(user_id)
 638                            .cloned()
 639                            .with_context(|| format!("user {user_id} not found"))
 640                    })
 641                    .collect()
 642            })?
 643        })
 644    }
 645
 646    pub fn fuzzy_search_users(
 647        &self,
 648        query: String,
 649        cx: &Context<Self>,
 650    ) -> Task<Result<Vec<Arc<User>>>> {
 651        self.load_users(proto::FuzzySearchUsers { query }, cx)
 652    }
 653
 654    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
 655        self.users.get(&user_id).cloned()
 656    }
 657
 658    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
 659        if let Some(user) = self.users.get(&user_id).cloned() {
 660            return Some(user);
 661        }
 662
 663        self.get_user(user_id, cx).detach_and_log_err(cx);
 664        None
 665    }
 666
 667    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
 668        if let Some(user) = self.users.get(&user_id).cloned() {
 669            return Task::ready(Ok(user));
 670        }
 671
 672        let load_users = self.get_users(vec![user_id], cx);
 673        cx.spawn(async move |this, cx| {
 674            load_users.await?;
 675            this.read_with(cx, |this, _| {
 676                this.users
 677                    .get(&user_id)
 678                    .cloned()
 679                    .context("server responded with no users")
 680            })?
 681        })
 682    }
 683
 684    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
 685        self.by_github_login
 686            .get(github_login)
 687            .and_then(|id| self.users.get(id).cloned())
 688    }
 689
 690    pub fn current_user(&self) -> Option<Arc<User>> {
 691        self.current_user.borrow().clone()
 692    }
 693
 694    pub fn current_organization(&self) -> Option<Arc<Organization>> {
 695        self.current_organization.clone()
 696    }
 697
 698    pub fn set_current_organization(
 699        &mut self,
 700        organization: Arc<Organization>,
 701        cx: &mut Context<Self>,
 702    ) {
 703        let is_same_organization = self
 704            .current_organization
 705            .as_ref()
 706            .is_some_and(|current| current.id == organization.id);
 707
 708        if !is_same_organization {
 709            self.current_organization.replace(organization);
 710            cx.emit(Event::OrganizationChanged);
 711            cx.notify();
 712        }
 713    }
 714
 715    pub fn organizations(&self) -> &Vec<Arc<Organization>> {
 716        &self.organizations
 717    }
 718
 719    pub fn plan_for_organization(&self, organization_id: &OrganizationId) -> Option<Plan> {
 720        self.plans_by_organization.get(organization_id).copied()
 721    }
 722
 723    pub fn plan(&self) -> Option<Plan> {
 724        #[cfg(debug_assertions)]
 725        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
 726            use cloud_api_client::Plan;
 727
 728            return match plan.as_str() {
 729                "free" => Some(Plan::ZedFree),
 730                "trial" => Some(Plan::ZedProTrial),
 731                "pro" => Some(Plan::ZedPro),
 732                _ => {
 733                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
 734                }
 735            };
 736        }
 737
 738        if let Some(organization) = &self.current_organization
 739            && let Some(plan) = self.plan_for_organization(&organization.id)
 740        {
 741            return Some(plan);
 742        }
 743
 744        self.plan_info.as_ref().map(|info| info.plan())
 745    }
 746
 747    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 748        self.plan_info
 749            .as_ref()
 750            .and_then(|plan| plan.subscription_period)
 751            .map(|subscription_period| {
 752                (
 753                    subscription_period.started_at.0,
 754                    subscription_period.ended_at.0,
 755                )
 756            })
 757    }
 758
 759    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
 760        self.plan_info
 761            .as_ref()
 762            .and_then(|plan| plan.trial_started_at)
 763            .map(|trial_started_at| trial_started_at.0)
 764    }
 765
 766    /// Returns whether the user's account is too new to use the service.
 767    pub fn account_too_young(&self) -> bool {
 768        self.plan_info
 769            .as_ref()
 770            .map(|plan| plan.is_account_too_young)
 771            .unwrap_or_default()
 772    }
 773
 774    /// Returns whether the current user has overdue invoices and usage should be blocked.
 775    pub fn has_overdue_invoices(&self) -> bool {
 776        self.plan_info
 777            .as_ref()
 778            .map(|plan| plan.has_overdue_invoices)
 779            .unwrap_or_default()
 780    }
 781
 782    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
 783        self.edit_prediction_usage
 784    }
 785
 786    pub fn update_edit_prediction_usage(
 787        &mut self,
 788        usage: EditPredictionUsage,
 789        cx: &mut Context<Self>,
 790    ) {
 791        self.edit_prediction_usage = Some(usage);
 792        cx.notify();
 793    }
 794
 795    pub fn clear_organizations(&mut self) {
 796        self.organizations.clear();
 797        self.current_organization = None;
 798    }
 799
 800    pub fn clear_plan_and_usage(&mut self) {
 801        self.plan_info = None;
 802        self.edit_prediction_usage = None;
 803    }
 804
 805    fn update_authenticated_user(
 806        &mut self,
 807        response: GetAuthenticatedUserResponse,
 808        cx: &mut Context<Self>,
 809    ) {
 810        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
 811        cx.update_flags(staff, response.feature_flags);
 812        if let Some(client) = self.client.upgrade() {
 813            client
 814                .telemetry
 815                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
 816        }
 817
 818        self.organizations = response.organizations.into_iter().map(Arc::new).collect();
 819        self.current_organization = self.organizations.first().cloned();
 820
 821        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
 822            limit: response.plan.usage.edit_predictions.limit,
 823            amount: response.plan.usage.edit_predictions.used as i32,
 824        }));
 825        self.plan_info = Some(response.plan);
 826        cx.emit(Event::PrivateUserInfoUpdated);
 827    }
 828
 829    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
 830        cx.spawn(async move |cx| {
 831            match message {
 832                MessageToClient::UserUpdated => {
 833                    let cloud_client = cx
 834                        .update(|cx| {
 835                            this.read_with(cx, |this, _cx| {
 836                                this.client.upgrade().map(|client| client.cloud_client())
 837                            })
 838                        })?
 839                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
 840
 841                    let response = cloud_client.get_authenticated_user().await?;
 842                    cx.update(|cx| {
 843                        this.update(cx, |this, cx| {
 844                            this.update_authenticated_user(response, cx);
 845                        })
 846                    })?;
 847                }
 848            }
 849
 850            anyhow::Ok(())
 851        })
 852        .detach_and_log_err(cx);
 853    }
 854
 855    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
 856        self.current_user.clone()
 857    }
 858
 859    fn load_users(
 860        &self,
 861        request: impl RequestMessage<Response = UsersResponse>,
 862        cx: &Context<Self>,
 863    ) -> Task<Result<Vec<Arc<User>>>> {
 864        let client = self.client.clone();
 865        cx.spawn(async move |this, cx| {
 866            if let Some(rpc) = client.upgrade() {
 867                let response = rpc.request(request).await.context("error loading users")?;
 868                let users = response.users;
 869
 870                this.update(cx, |this, _| this.insert(users))
 871            } else {
 872                Ok(Vec::new())
 873            }
 874        })
 875    }
 876
 877    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
 878        let mut ret = Vec::with_capacity(users.len());
 879        for user in users {
 880            let user = User::new(user);
 881            if let Some(old) = self.users.insert(user.id, user.clone())
 882                && old.github_login != user.github_login
 883            {
 884                self.by_github_login.remove(&old.github_login);
 885            }
 886            self.by_github_login
 887                .insert(user.github_login.clone(), user.id);
 888            ret.push(user)
 889        }
 890        ret
 891    }
 892
 893    pub fn set_participant_indices(
 894        &mut self,
 895        participant_indices: HashMap<u64, ParticipantIndex>,
 896        cx: &mut Context<Self>,
 897    ) {
 898        if participant_indices != self.participant_indices {
 899            self.participant_indices = participant_indices;
 900            cx.emit(Event::ParticipantIndicesChanged);
 901        }
 902    }
 903
 904    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
 905        &self.participant_indices
 906    }
 907
 908    pub fn participant_names(
 909        &self,
 910        user_ids: impl Iterator<Item = u64>,
 911        cx: &App,
 912    ) -> HashMap<u64, SharedString> {
 913        let mut ret = HashMap::default();
 914        let mut missing_user_ids = Vec::new();
 915        for id in user_ids {
 916            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
 917                ret.insert(id, github_login);
 918            } else {
 919                missing_user_ids.push(id)
 920            }
 921        }
 922        if !missing_user_ids.is_empty() {
 923            let this = self.weak_self.clone();
 924            cx.spawn(async move |cx| {
 925                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
 926                    .await
 927            })
 928            .detach_and_log_err(cx);
 929        }
 930        ret
 931    }
 932}
 933
 934impl User {
 935    fn new(message: proto::User) -> Arc<Self> {
 936        Arc::new(User {
 937            id: message.id,
 938            github_login: message.github_login.into(),
 939            avatar_uri: message.avatar_url.into(),
 940            name: message.name,
 941        })
 942    }
 943}
 944
 945impl Contact {
 946    async fn from_proto(
 947        contact: proto::Contact,
 948        user_store: &Entity<UserStore>,
 949        cx: &mut AsyncApp,
 950    ) -> Result<Self> {
 951        let user = user_store
 952            .update(cx, |user_store, cx| {
 953                user_store.get_user(contact.user_id, cx)
 954            })
 955            .await?;
 956        Ok(Self {
 957            user,
 958            online: contact.online,
 959            busy: contact.busy,
 960        })
 961    }
 962}
 963
 964impl Collaborator {
 965    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
 966        Ok(Self {
 967            peer_id: message.peer_id.context("invalid peer id")?,
 968            replica_id: ReplicaId::new(message.replica_id as u16),
 969            user_id: message.user_id as UserId,
 970            is_host: message.is_host,
 971            committer_name: message.committer_name,
 972            committer_email: message.committer_email,
 973        })
 974    }
 975}
 976
 977impl RequestUsage {
 978    pub fn over_limit(&self) -> bool {
 979        match self.limit {
 980            UsageLimit::Limited(limit) => self.amount >= limit,
 981            UsageLimit::Unlimited => false,
 982        }
 983    }
 984
 985    fn from_headers(
 986        limit_name: &str,
 987        amount_name: &str,
 988        headers: &HeaderMap<HeaderValue>,
 989    ) -> Result<Self> {
 990        let limit = headers
 991            .get(limit_name)
 992            .with_context(|| format!("missing {limit_name:?} header"))?;
 993        let limit = UsageLimit::from_str(limit.to_str()?)?;
 994
 995        let amount = headers
 996            .get(amount_name)
 997            .with_context(|| format!("missing {amount_name:?} header"))?;
 998        let amount = amount.to_str()?.parse::<i32>()?;
 999
1000        Ok(Self { limit, amount })
1001    }
1002}
1003
1004impl EditPredictionUsage {
1005    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1006        Ok(Self(RequestUsage::from_headers(
1007            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1008            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1009            headers,
1010        )?))
1011    }
1012}