user.rs

   1use super::{Client, Status, TypedEnvelope, proto};
   2use anyhow::{Context as _, Result};
   3use chrono::{DateTime, Utc};
   4use cloud_api_client::websocket_protocol::MessageToClient;
   5use cloud_api_client::{
   6    GetAuthenticatedUserResponse, KnownOrUnknown, Organization, OrganizationId, Plan, PlanInfo,
   7};
   8use cloud_api_types::OrganizationConfiguration;
   9use cloud_llm_client::{
  10    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
  11};
  12use collections::{HashMap, HashSet, hash_map::Entry};
  13use db::kvp::KeyValueStore;
  14use derive_more::Deref;
  15use feature_flags::FeatureFlagAppExt;
  16use futures::{Future, StreamExt, channel::mpsc};
  17use gpui::{
  18    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, TaskExt,
  19    WeakEntity,
  20};
  21use http_client::http::{HeaderMap, HeaderValue};
  22use postage::{sink::Sink, watch};
  23use rpc::proto::{RequestMessage, UsersResponse};
  24use std::{
  25    str::FromStr as _,
  26    sync::{Arc, Weak},
  27};
  28use text::ReplicaId;
  29use util::{ResultExt, TryFutureExt as _};
  30
  31const CURRENT_ORGANIZATION_ID_KEY: &str = "current_organization_id";
  32
  33pub type UserId = u64;
  34
  35#[derive(
  36    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  37)]
  38pub struct ChannelId(pub u64);
  39
  40impl std::fmt::Display for ChannelId {
  41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  42        self.0.fmt(f)
  43    }
  44}
  45
  46#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
  47pub struct ProjectId(pub u64);
  48
  49impl ProjectId {
  50    pub fn to_proto(self) -> u64 {
  51        self.0
  52    }
  53}
  54
  55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  56pub struct ParticipantIndex(pub u32);
  57
  58#[derive(Default, Debug)]
  59pub struct User {
  60    pub id: UserId,
  61    pub github_login: SharedString,
  62    pub avatar_uri: SharedUri,
  63    pub name: Option<String>,
  64}
  65
  66#[derive(Clone, Debug, PartialEq, Eq)]
  67pub struct Collaborator {
  68    pub peer_id: proto::PeerId,
  69    pub replica_id: ReplicaId,
  70    pub user_id: UserId,
  71    pub is_host: bool,
  72    pub committer_name: Option<String>,
  73    pub committer_email: Option<String>,
  74}
  75
  76impl PartialOrd for User {
  77    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
  78        Some(self.cmp(other))
  79    }
  80}
  81
  82impl Ord for User {
  83    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
  84        self.github_login.cmp(&other.github_login)
  85    }
  86}
  87
  88impl PartialEq for User {
  89    fn eq(&self, other: &Self) -> bool {
  90        self.id == other.id && self.github_login == other.github_login
  91    }
  92}
  93
  94impl Eq for User {}
  95
  96#[derive(Debug, PartialEq)]
  97pub struct Contact {
  98    pub user: Arc<User>,
  99    pub online: bool,
 100    pub busy: bool,
 101}
 102
 103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 104pub enum ContactRequestStatus {
 105    None,
 106    RequestSent,
 107    RequestReceived,
 108    RequestAccepted,
 109}
 110
 111pub struct UserStore {
 112    users: HashMap<u64, Arc<User>>,
 113    by_github_login: HashMap<SharedString, u64>,
 114    participant_indices: HashMap<u64, ParticipantIndex>,
 115    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 116    edit_prediction_usage: Option<EditPredictionUsage>,
 117    plan_info: Option<PlanInfo>,
 118    current_user: watch::Receiver<Option<Arc<User>>>,
 119    current_organization: Option<Arc<Organization>>,
 120    organizations: Vec<Arc<Organization>>,
 121    plans_by_organization: HashMap<OrganizationId, Plan>,
 122    configuration_by_organization: HashMap<OrganizationId, OrganizationConfiguration>,
 123    contacts: Vec<Arc<Contact>>,
 124    incoming_contact_requests: Vec<Arc<User>>,
 125    outgoing_contact_requests: Vec<Arc<User>>,
 126    pending_contact_requests: HashMap<u64, usize>,
 127    client: Weak<Client>,
 128    _maintain_contacts: Task<()>,
 129    _maintain_current_user: Task<Result<()>>,
 130    _handle_sign_out: Task<()>,
 131    weak_self: WeakEntity<Self>,
 132}
 133
 134#[derive(Clone)]
 135pub struct InviteInfo {
 136    pub count: u32,
 137    pub url: Arc<str>,
 138}
 139
 140pub enum Event {
 141    Contact {
 142        user: Arc<User>,
 143        kind: ContactEventKind,
 144    },
 145    ShowContacts,
 146    ParticipantIndicesChanged,
 147    PrivateUserInfoUpdated,
 148    PlanUpdated,
 149    OrganizationChanged,
 150}
 151
 152#[derive(Clone, Copy)]
 153pub enum ContactEventKind {
 154    Requested,
 155    Accepted,
 156    Cancelled,
 157}
 158
 159impl EventEmitter<Event> for UserStore {}
 160
 161enum UpdateContacts {
 162    Update(proto::UpdateContacts),
 163    Wait(postage::barrier::Sender),
 164    Clear(postage::barrier::Sender),
 165}
 166
 167#[derive(Debug, Clone, Copy, Deref)]
 168pub struct EditPredictionUsage(pub RequestUsage);
 169
 170#[derive(Debug, Clone, Copy)]
 171pub struct RequestUsage {
 172    pub limit: UsageLimit,
 173    pub amount: i32,
 174}
 175
 176impl UserStore {
 177    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
 178        let (mut current_user_tx, current_user_rx) = watch::channel();
 179        let (sign_out_tx, mut sign_out_rx) = mpsc::unbounded();
 180        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 181        let rpc_subscriptions = vec![
 182            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
 183            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
 184        ];
 185
 186        client.sign_out_tx.lock().replace(sign_out_tx);
 187        client.add_message_to_client_handler({
 188            let this = cx.weak_entity();
 189            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
 190        });
 191
 192        Self {
 193            users: Default::default(),
 194            by_github_login: Default::default(),
 195            current_user: current_user_rx,
 196            current_organization: None,
 197            organizations: Vec::new(),
 198            plans_by_organization: HashMap::default(),
 199            configuration_by_organization: HashMap::default(),
 200            plan_info: None,
 201            edit_prediction_usage: None,
 202            contacts: Default::default(),
 203            incoming_contact_requests: Default::default(),
 204            participant_indices: Default::default(),
 205            outgoing_contact_requests: Default::default(),
 206            client: Arc::downgrade(&client),
 207            update_contacts_tx,
 208            _maintain_contacts: cx.spawn(async move |this, cx| {
 209                let _subscriptions = rpc_subscriptions;
 210                while let Some(message) = update_contacts_rx.next().await {
 211                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
 212                    {
 213                        task.log_err().await;
 214                    } else {
 215                        break;
 216                    }
 217                }
 218            }),
 219            _maintain_current_user: cx.spawn(async move |this, cx| {
 220                let mut status = client.status();
 221                let weak = Arc::downgrade(&client);
 222                drop(client);
 223                while let Some(status) = status.next().await {
 224                    // if the client is dropped, the app is shutting down.
 225                    let Some(client) = weak.upgrade() else {
 226                        return Ok(());
 227                    };
 228                    match status {
 229                        Status::Authenticated
 230                        | Status::Reauthenticated
 231                        | Status::Connected { .. } => {
 232                            if let Some(user_id) = client.user_id() {
 233                                let response = client
 234                                    .cloud_client()
 235                                    .get_authenticated_user()
 236                                    .await
 237                                    .log_err();
 238
 239                                let current_user_and_response = if let Some(response) = response {
 240                                    let user = Arc::new(User {
 241                                        id: user_id,
 242                                        github_login: response.user.github_login.clone().into(),
 243                                        avatar_uri: response.user.avatar_url.clone().into(),
 244                                        name: response.user.name.clone(),
 245                                    });
 246
 247                                    Some((user, response))
 248                                } else {
 249                                    None
 250                                };
 251                                current_user_tx
 252                                    .send(
 253                                        current_user_and_response
 254                                            .as_ref()
 255                                            .map(|(user, _)| user.clone()),
 256                                    )
 257                                    .await
 258                                    .ok();
 259
 260                                cx.update(|cx| {
 261                                    if let Some((user, response)) = current_user_and_response {
 262                                        this.update(cx, |this, cx| {
 263                                            this.by_github_login
 264                                                .insert(user.github_login.clone(), user_id);
 265                                            this.users.insert(user_id, user);
 266                                            this.update_authenticated_user(response, cx)
 267                                        })
 268                                    } else {
 269                                        anyhow::Ok(())
 270                                    }
 271                                })?;
 272
 273                                this.update(cx, |_, cx| cx.notify())?;
 274                            }
 275                        }
 276                        Status::SignedOut => {
 277                            current_user_tx.send(None).await.ok();
 278                            this.update(cx, |this, cx| {
 279                                this.clear_organizations();
 280                                this.clear_plan_and_usage();
 281                                cx.emit(Event::PrivateUserInfoUpdated);
 282                                cx.notify();
 283                                this.clear_contacts()
 284                            })?
 285                            .await;
 286                        }
 287                        Status::ConnectionLost => {
 288                            this.update(cx, |this, cx| {
 289                                cx.notify();
 290                                this.clear_contacts()
 291                            })?
 292                            .await;
 293                        }
 294                        _ => {}
 295                    }
 296                }
 297                Ok(())
 298            }),
 299            _handle_sign_out: cx.spawn(async move |this, cx| {
 300                while let Some(()) = sign_out_rx.next().await {
 301                    let Some(client) = this
 302                        .read_with(cx, |this, _cx| this.client.upgrade())
 303                        .ok()
 304                        .flatten()
 305                    else {
 306                        break;
 307                    };
 308
 309                    client.sign_out(cx).await;
 310                }
 311            }),
 312            pending_contact_requests: Default::default(),
 313            weak_self: cx.weak_entity(),
 314        }
 315    }
 316
 317    #[cfg(feature = "test-support")]
 318    pub fn clear_cache(&mut self) {
 319        self.users.clear();
 320        self.by_github_login.clear();
 321    }
 322
 323    async fn handle_show_contacts(
 324        this: Entity<Self>,
 325        _: TypedEnvelope<proto::ShowContacts>,
 326        mut cx: AsyncApp,
 327    ) -> Result<()> {
 328        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
 329        Ok(())
 330    }
 331
 332    async fn handle_update_contacts(
 333        this: Entity<Self>,
 334        message: TypedEnvelope<proto::UpdateContacts>,
 335        cx: AsyncApp,
 336    ) -> Result<()> {
 337        this.read_with(&cx, |this, _| {
 338            this.update_contacts_tx
 339                .unbounded_send(UpdateContacts::Update(message.payload))
 340                .unwrap();
 341        });
 342        Ok(())
 343    }
 344
 345    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
 346        match message {
 347            UpdateContacts::Wait(barrier) => {
 348                drop(barrier);
 349                Task::ready(Ok(()))
 350            }
 351            UpdateContacts::Clear(barrier) => {
 352                self.contacts.clear();
 353                self.incoming_contact_requests.clear();
 354                self.outgoing_contact_requests.clear();
 355                drop(barrier);
 356                Task::ready(Ok(()))
 357            }
 358            UpdateContacts::Update(message) => {
 359                let mut user_ids = HashSet::default();
 360                for contact in &message.contacts {
 361                    user_ids.insert(contact.user_id);
 362                }
 363                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
 364                user_ids.extend(message.outgoing_requests.iter());
 365
 366                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
 367                cx.spawn(async move |this, cx| {
 368                    load_users.await?;
 369
 370                    // Users are fetched in parallel above and cached in call to get_users
 371                    // No need to parallelize here
 372                    let mut updated_contacts = Vec::new();
 373                    let this = this.upgrade().context("can't upgrade user store handle")?;
 374                    for contact in message.contacts {
 375                        updated_contacts
 376                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
 377                    }
 378
 379                    let mut incoming_requests = Vec::new();
 380                    for request in message.incoming_requests {
 381                        incoming_requests.push({
 382                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
 383                                .await?
 384                        });
 385                    }
 386
 387                    let mut outgoing_requests = Vec::new();
 388                    for requested_user_id in message.outgoing_requests {
 389                        outgoing_requests.push(
 390                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
 391                                .await?,
 392                        );
 393                    }
 394
 395                    let removed_contacts =
 396                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
 397                    let removed_incoming_requests =
 398                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
 399                    let removed_outgoing_requests =
 400                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
 401
 402                    this.update(cx, |this, cx| {
 403                        // Remove contacts
 404                        this.contacts
 405                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
 406                        // Update existing contacts and insert new ones
 407                        for updated_contact in updated_contacts {
 408                            match this.contacts.binary_search_by_key(
 409                                &&updated_contact.user.github_login,
 410                                |contact| &contact.user.github_login,
 411                            ) {
 412                                Ok(ix) => this.contacts[ix] = updated_contact,
 413                                Err(ix) => this.contacts.insert(ix, updated_contact),
 414                            }
 415                        }
 416
 417                        // Remove incoming contact requests
 418                        this.incoming_contact_requests.retain(|user| {
 419                            if removed_incoming_requests.contains(&user.id) {
 420                                cx.emit(Event::Contact {
 421                                    user: user.clone(),
 422                                    kind: ContactEventKind::Cancelled,
 423                                });
 424                                false
 425                            } else {
 426                                true
 427                            }
 428                        });
 429                        // Update existing incoming requests and insert new ones
 430                        for user in incoming_requests {
 431                            match this
 432                                .incoming_contact_requests
 433                                .binary_search_by_key(&&user.github_login, |contact| {
 434                                    &contact.github_login
 435                                }) {
 436                                Ok(ix) => this.incoming_contact_requests[ix] = user,
 437                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
 438                            }
 439                        }
 440
 441                        // Remove outgoing contact requests
 442                        this.outgoing_contact_requests
 443                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
 444                        // Update existing incoming requests and insert new ones
 445                        for request in outgoing_requests {
 446                            match this
 447                                .outgoing_contact_requests
 448                                .binary_search_by_key(&&request.github_login, |contact| {
 449                                    &contact.github_login
 450                                }) {
 451                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
 452                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
 453                            }
 454                        }
 455
 456                        cx.notify();
 457                    });
 458
 459                    Ok(())
 460                })
 461            }
 462        }
 463    }
 464
 465    pub fn contacts(&self) -> &[Arc<Contact>] {
 466        &self.contacts
 467    }
 468
 469    pub fn has_contact(&self, user: &Arc<User>) -> bool {
 470        self.contacts
 471            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 472            .is_ok()
 473    }
 474
 475    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
 476        &self.incoming_contact_requests
 477    }
 478
 479    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
 480        &self.outgoing_contact_requests
 481    }
 482
 483    pub fn is_contact_request_pending(&self, user: &User) -> bool {
 484        self.pending_contact_requests.contains_key(&user.id)
 485    }
 486
 487    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
 488        if self
 489            .contacts
 490            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 491            .is_ok()
 492        {
 493            ContactRequestStatus::RequestAccepted
 494        } else if self
 495            .outgoing_contact_requests
 496            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 497            .is_ok()
 498        {
 499            ContactRequestStatus::RequestSent
 500        } else if self
 501            .incoming_contact_requests
 502            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 503            .is_ok()
 504        {
 505            ContactRequestStatus::RequestReceived
 506        } else {
 507            ContactRequestStatus::None
 508        }
 509    }
 510
 511    pub fn request_contact(
 512        &mut self,
 513        responder_id: u64,
 514        cx: &mut Context<Self>,
 515    ) -> Task<Result<()>> {
 516        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
 517    }
 518
 519    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
 520        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
 521    }
 522
 523    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
 524        self.incoming_contact_requests
 525            .iter()
 526            .any(|user| user.id == user_id)
 527    }
 528
 529    pub fn respond_to_contact_request(
 530        &mut self,
 531        requester_id: u64,
 532        accept: bool,
 533        cx: &mut Context<Self>,
 534    ) -> Task<Result<()>> {
 535        self.perform_contact_request(
 536            requester_id,
 537            proto::RespondToContactRequest {
 538                requester_id,
 539                response: if accept {
 540                    proto::ContactRequestResponse::Accept
 541                } else {
 542                    proto::ContactRequestResponse::Decline
 543                } as i32,
 544            },
 545            cx,
 546        )
 547    }
 548
 549    pub fn dismiss_contact_request(
 550        &self,
 551        requester_id: u64,
 552        cx: &Context<Self>,
 553    ) -> Task<Result<()>> {
 554        let client = self.client.upgrade();
 555        cx.spawn(async move |_, _| {
 556            client
 557                .context("can't upgrade client reference")?
 558                .request(proto::RespondToContactRequest {
 559                    requester_id,
 560                    response: proto::ContactRequestResponse::Dismiss as i32,
 561                })
 562                .await?;
 563            Ok(())
 564        })
 565    }
 566
 567    fn perform_contact_request<T: RequestMessage>(
 568        &mut self,
 569        user_id: u64,
 570        request: T,
 571        cx: &mut Context<Self>,
 572    ) -> Task<Result<()>> {
 573        let client = self.client.upgrade();
 574        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
 575        cx.notify();
 576
 577        cx.spawn(async move |this, cx| {
 578            let response = client
 579                .context("can't upgrade client reference")?
 580                .request(request)
 581                .await;
 582            this.update(cx, |this, cx| {
 583                if let Entry::Occupied(mut request_count) =
 584                    this.pending_contact_requests.entry(user_id)
 585                {
 586                    *request_count.get_mut() -= 1;
 587                    if *request_count.get() == 0 {
 588                        request_count.remove();
 589                    }
 590                }
 591                cx.notify();
 592            })?;
 593            response?;
 594            Ok(())
 595        })
 596    }
 597
 598    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
 599        let (tx, mut rx) = postage::barrier::channel();
 600        self.update_contacts_tx
 601            .unbounded_send(UpdateContacts::Clear(tx))
 602            .unwrap();
 603        async move {
 604            rx.next().await;
 605        }
 606    }
 607
 608    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
 609        let (tx, mut rx) = postage::barrier::channel();
 610        self.update_contacts_tx
 611            .unbounded_send(UpdateContacts::Wait(tx))
 612            .unwrap();
 613        async move {
 614            rx.next().await;
 615        }
 616    }
 617
 618    pub fn get_users(
 619        &self,
 620        user_ids: Vec<u64>,
 621        cx: &Context<Self>,
 622    ) -> Task<Result<Vec<Arc<User>>>> {
 623        let mut user_ids_to_fetch = user_ids.clone();
 624        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
 625
 626        cx.spawn(async move |this, cx| {
 627            if !user_ids_to_fetch.is_empty() {
 628                this.update(cx, |this, cx| {
 629                    this.load_users(
 630                        proto::GetUsers {
 631                            user_ids: user_ids_to_fetch,
 632                        },
 633                        cx,
 634                    )
 635                })?
 636                .await?;
 637            }
 638
 639            this.read_with(cx, |this, _| {
 640                user_ids
 641                    .iter()
 642                    .map(|user_id| {
 643                        this.users
 644                            .get(user_id)
 645                            .cloned()
 646                            .with_context(|| format!("user {user_id} not found"))
 647                    })
 648                    .collect()
 649            })?
 650        })
 651    }
 652
 653    pub fn fuzzy_search_users(
 654        &self,
 655        query: String,
 656        cx: &Context<Self>,
 657    ) -> Task<Result<Vec<Arc<User>>>> {
 658        self.load_users(proto::FuzzySearchUsers { query }, cx)
 659    }
 660
 661    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
 662        self.users.get(&user_id).cloned()
 663    }
 664
 665    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
 666        if let Some(user) = self.users.get(&user_id).cloned() {
 667            return Some(user);
 668        }
 669
 670        self.get_user(user_id, cx).detach_and_log_err(cx);
 671        None
 672    }
 673
 674    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
 675        if let Some(user) = self.users.get(&user_id).cloned() {
 676            return Task::ready(Ok(user));
 677        }
 678
 679        let load_users = self.get_users(vec![user_id], cx);
 680        cx.spawn(async move |this, cx| {
 681            load_users.await?;
 682            this.read_with(cx, |this, _| {
 683                this.users
 684                    .get(&user_id)
 685                    .cloned()
 686                    .context("server responded with no users")
 687            })?
 688        })
 689    }
 690
 691    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
 692        self.by_github_login
 693            .get(github_login)
 694            .and_then(|id| self.users.get(id).cloned())
 695    }
 696
 697    pub fn current_user(&self) -> Option<Arc<User>> {
 698        self.current_user.borrow().clone()
 699    }
 700
 701    pub fn current_organization(&self) -> Option<Arc<Organization>> {
 702        self.current_organization.clone()
 703    }
 704
 705    pub fn set_current_organization(
 706        &mut self,
 707        organization: Arc<Organization>,
 708        cx: &mut Context<Self>,
 709    ) {
 710        let is_same_organization = self
 711            .current_organization
 712            .as_ref()
 713            .is_some_and(|current| current.id == organization.id);
 714
 715        if !is_same_organization {
 716            let organization_id = organization.id.0.to_string();
 717            self.current_organization.replace(organization);
 718            cx.emit(Event::OrganizationChanged);
 719            cx.notify();
 720
 721            let kvp = KeyValueStore::global(cx);
 722            db::write_and_log(cx, move || async move {
 723                kvp.write_kvp(CURRENT_ORGANIZATION_ID_KEY.into(), organization_id)
 724                    .await
 725            });
 726        }
 727    }
 728
 729    pub fn organizations(&self) -> &Vec<Arc<Organization>> {
 730        &self.organizations
 731    }
 732
 733    pub fn plan_for_organization(&self, organization_id: &OrganizationId) -> Option<Plan> {
 734        self.plans_by_organization.get(organization_id).copied()
 735    }
 736
 737    pub fn current_organization_configuration(&self) -> Option<&OrganizationConfiguration> {
 738        let current_organization = self.current_organization.as_ref()?;
 739
 740        self.configuration_by_organization
 741            .get(&current_organization.id)
 742    }
 743
 744    pub fn plan(&self) -> Option<Plan> {
 745        #[cfg(debug_assertions)]
 746        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
 747            use cloud_api_client::Plan;
 748
 749            return match plan.as_str() {
 750                "free" => Some(Plan::ZedFree),
 751                "trial" => Some(Plan::ZedProTrial),
 752                "pro" => Some(Plan::ZedPro),
 753                _ => {
 754                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
 755                }
 756            };
 757        }
 758
 759        if let Some(organization) = &self.current_organization
 760            && let Some(plan) = self.plan_for_organization(&organization.id)
 761        {
 762            return Some(plan);
 763        }
 764
 765        self.plan_info.as_ref().map(|info| info.plan())
 766    }
 767
 768    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 769        self.plan_info
 770            .as_ref()
 771            .and_then(|plan| plan.subscription_period)
 772            .map(|subscription_period| {
 773                (
 774                    subscription_period.started_at.0,
 775                    subscription_period.ended_at.0,
 776                )
 777            })
 778    }
 779
 780    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
 781        self.plan_info
 782            .as_ref()
 783            .and_then(|plan| plan.trial_started_at)
 784            .map(|trial_started_at| trial_started_at.0)
 785    }
 786
 787    /// Returns whether the user's account is too new to use the service.
 788    pub fn account_too_young(&self) -> bool {
 789        self.plan_info
 790            .as_ref()
 791            .map(|plan| plan.is_account_too_young)
 792            .unwrap_or_default()
 793    }
 794
 795    /// Returns whether the current user has overdue invoices and usage should be blocked.
 796    pub fn has_overdue_invoices(&self) -> bool {
 797        self.plan_info
 798            .as_ref()
 799            .map(|plan| plan.has_overdue_invoices)
 800            .unwrap_or_default()
 801    }
 802
 803    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
 804        self.edit_prediction_usage
 805    }
 806
 807    pub fn update_edit_prediction_usage(
 808        &mut self,
 809        usage: EditPredictionUsage,
 810        cx: &mut Context<Self>,
 811    ) {
 812        self.edit_prediction_usage = Some(usage);
 813        cx.notify();
 814    }
 815
 816    pub fn clear_organizations(&mut self) {
 817        self.organizations.clear();
 818        self.current_organization = None;
 819    }
 820
 821    pub fn clear_plan_and_usage(&mut self) {
 822        self.plan_info = None;
 823        self.edit_prediction_usage = None;
 824    }
 825
 826    fn update_authenticated_user(
 827        &mut self,
 828        response: GetAuthenticatedUserResponse,
 829        cx: &mut Context<Self>,
 830    ) {
 831        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
 832        cx.update_flags(staff, response.feature_flags);
 833        if let Some(client) = self.client.upgrade() {
 834            client
 835                .telemetry
 836                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
 837        }
 838
 839        self.organizations = response.organizations.into_iter().map(Arc::new).collect();
 840        let persisted_org_id = KeyValueStore::global(cx)
 841            .read_kvp(CURRENT_ORGANIZATION_ID_KEY)
 842            .log_err()
 843            .flatten()
 844            .map(|id| OrganizationId(Arc::from(id)));
 845
 846        self.current_organization = persisted_org_id
 847            .and_then(|persisted_id| {
 848                self.organizations
 849                    .iter()
 850                    .find(|org| org.id == persisted_id)
 851                    .cloned()
 852            })
 853            .or_else(|| {
 854                response
 855                    .default_organization_id
 856                    .and_then(|default_organization_id| {
 857                        self.organizations
 858                            .iter()
 859                            .find(|organization| organization.id == default_organization_id)
 860                            .cloned()
 861                    })
 862            })
 863            .or_else(|| self.organizations.first().cloned());
 864        self.plans_by_organization = response
 865            .plans_by_organization
 866            .into_iter()
 867            .map(|(organization_id, plan)| {
 868                let plan = match plan {
 869                    KnownOrUnknown::Known(plan) => plan,
 870                    KnownOrUnknown::Unknown(_) => {
 871                        // If we get a plan that we don't recognize, fall back to the Free plan.
 872                        Plan::ZedFree
 873                    }
 874                };
 875
 876                (organization_id, plan)
 877            })
 878            .collect();
 879        self.configuration_by_organization =
 880            response.configuration_by_organization.into_iter().collect();
 881
 882        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
 883            limit: response.plan.usage.edit_predictions.limit,
 884            amount: response.plan.usage.edit_predictions.used as i32,
 885        }));
 886        self.plan_info = Some(response.plan);
 887        cx.emit(Event::PrivateUserInfoUpdated);
 888    }
 889
 890    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
 891        cx.spawn(async move |cx| {
 892            match message {
 893                MessageToClient::UserUpdated => {
 894                    let cloud_client = cx
 895                        .update(|cx| {
 896                            this.read_with(cx, |this, _cx| {
 897                                this.client.upgrade().map(|client| client.cloud_client())
 898                            })
 899                        })?
 900                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
 901
 902                    let response = cloud_client.get_authenticated_user().await?;
 903                    cx.update(|cx| {
 904                        this.update(cx, |this, cx| {
 905                            this.update_authenticated_user(response, cx);
 906                        })
 907                    })?;
 908                }
 909            }
 910
 911            anyhow::Ok(())
 912        })
 913        .detach_and_log_err(cx);
 914    }
 915
 916    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
 917        self.current_user.clone()
 918    }
 919
 920    fn load_users(
 921        &self,
 922        request: impl RequestMessage<Response = UsersResponse>,
 923        cx: &Context<Self>,
 924    ) -> Task<Result<Vec<Arc<User>>>> {
 925        let client = self.client.clone();
 926        cx.spawn(async move |this, cx| {
 927            if let Some(rpc) = client.upgrade() {
 928                let response = rpc.request(request).await.context("error loading users")?;
 929                let users = response.users;
 930
 931                this.update(cx, |this, _| this.insert(users))
 932            } else {
 933                Ok(Vec::new())
 934            }
 935        })
 936    }
 937
 938    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
 939        let mut ret = Vec::with_capacity(users.len());
 940        for user in users {
 941            let user = User::new(user);
 942            if let Some(old) = self.users.insert(user.id, user.clone())
 943                && old.github_login != user.github_login
 944            {
 945                self.by_github_login.remove(&old.github_login);
 946            }
 947            self.by_github_login
 948                .insert(user.github_login.clone(), user.id);
 949            ret.push(user)
 950        }
 951        ret
 952    }
 953
 954    pub fn set_participant_indices(
 955        &mut self,
 956        participant_indices: HashMap<u64, ParticipantIndex>,
 957        cx: &mut Context<Self>,
 958    ) {
 959        if participant_indices != self.participant_indices {
 960            self.participant_indices = participant_indices;
 961            cx.emit(Event::ParticipantIndicesChanged);
 962        }
 963    }
 964
 965    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
 966        &self.participant_indices
 967    }
 968
 969    pub fn participant_names(
 970        &self,
 971        user_ids: impl Iterator<Item = u64>,
 972        cx: &App,
 973    ) -> HashMap<u64, SharedString> {
 974        let mut ret = HashMap::default();
 975        let mut missing_user_ids = Vec::new();
 976        for id in user_ids {
 977            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
 978                ret.insert(id, github_login);
 979            } else {
 980                missing_user_ids.push(id)
 981            }
 982        }
 983        if !missing_user_ids.is_empty() {
 984            let this = self.weak_self.clone();
 985            cx.spawn(async move |cx| {
 986                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
 987                    .await
 988            })
 989            .detach_and_log_err(cx);
 990        }
 991        ret
 992    }
 993}
 994
 995impl User {
 996    fn new(message: proto::User) -> Arc<Self> {
 997        Arc::new(User {
 998            id: message.id,
 999            github_login: message.github_login.into(),
1000            avatar_uri: message.avatar_url.into(),
1001            name: message.name,
1002        })
1003    }
1004}
1005
1006impl Contact {
1007    async fn from_proto(
1008        contact: proto::Contact,
1009        user_store: &Entity<UserStore>,
1010        cx: &mut AsyncApp,
1011    ) -> Result<Self> {
1012        let user = user_store
1013            .update(cx, |user_store, cx| {
1014                user_store.get_user(contact.user_id, cx)
1015            })
1016            .await?;
1017        Ok(Self {
1018            user,
1019            online: contact.online,
1020            busy: contact.busy,
1021        })
1022    }
1023}
1024
1025impl Collaborator {
1026    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
1027        Ok(Self {
1028            peer_id: message.peer_id.context("invalid peer id")?,
1029            replica_id: ReplicaId::new(message.replica_id as u16),
1030            user_id: message.user_id as UserId,
1031            is_host: message.is_host,
1032            committer_name: message.committer_name,
1033            committer_email: message.committer_email,
1034        })
1035    }
1036}
1037
1038impl RequestUsage {
1039    pub fn over_limit(&self) -> bool {
1040        match self.limit {
1041            UsageLimit::Limited(limit) => self.amount >= limit,
1042            UsageLimit::Unlimited => false,
1043        }
1044    }
1045
1046    fn from_headers(
1047        limit_name: &str,
1048        amount_name: &str,
1049        headers: &HeaderMap<HeaderValue>,
1050    ) -> Result<Self> {
1051        let limit = headers
1052            .get(limit_name)
1053            .with_context(|| format!("missing {limit_name:?} header"))?;
1054        let limit = UsageLimit::from_str(limit.to_str()?)?;
1055
1056        let amount = headers
1057            .get(amount_name)
1058            .with_context(|| format!("missing {amount_name:?} header"))?;
1059        let amount = amount.to_str()?.parse::<i32>()?;
1060
1061        Ok(Self { limit, amount })
1062    }
1063}
1064
1065impl EditPredictionUsage {
1066    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1067        Ok(Self(RequestUsage::from_headers(
1068            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1069            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1070            headers,
1071        )?))
1072    }
1073}