user.rs

   1use super::{Client, Status, TypedEnvelope, proto};
   2use anyhow::{Context as _, Result};
   3use chrono::{DateTime, Utc};
   4use cloud_api_client::websocket_protocol::MessageToClient;
   5use cloud_api_client::{
   6    GetAuthenticatedUserResponse, KnownOrUnknown, Organization, OrganizationId, Plan, PlanInfo,
   7};
   8use cloud_api_types::OrganizationConfiguration;
   9use cloud_llm_client::{
  10    EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME, EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME, UsageLimit,
  11};
  12use collections::{HashMap, HashSet, hash_map::Entry};
  13use db::kvp::KeyValueStore;
  14use derive_more::Deref;
  15use feature_flags::FeatureFlagAppExt;
  16use futures::{Future, StreamExt, channel::mpsc};
  17use gpui::{
  18    App, AsyncApp, Context, Entity, EventEmitter, SharedString, SharedUri, Task, WeakEntity,
  19};
  20use http_client::http::{HeaderMap, HeaderValue};
  21use postage::{sink::Sink, watch};
  22use rpc::proto::{RequestMessage, UsersResponse};
  23use std::{
  24    str::FromStr as _,
  25    sync::{Arc, Weak},
  26};
  27use text::ReplicaId;
  28use util::{ResultExt, TryFutureExt as _};
  29
  30const CURRENT_ORGANIZATION_ID_KEY: &str = "current_organization_id";
  31
  32pub type UserId = u64;
  33
  34#[derive(
  35    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  36)]
  37pub struct ChannelId(pub u64);
  38
  39impl std::fmt::Display for ChannelId {
  40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  41        self.0.fmt(f)
  42    }
  43}
  44
  45#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)]
  46pub struct ProjectId(pub u64);
  47
  48impl ProjectId {
  49    pub fn to_proto(self) -> u64 {
  50        self.0
  51    }
  52}
  53
  54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
  55pub struct ParticipantIndex(pub u32);
  56
  57#[derive(Default, Debug)]
  58pub struct User {
  59    pub id: UserId,
  60    pub github_login: SharedString,
  61    pub avatar_uri: SharedUri,
  62    pub name: Option<String>,
  63}
  64
  65#[derive(Clone, Debug, PartialEq, Eq)]
  66pub struct Collaborator {
  67    pub peer_id: proto::PeerId,
  68    pub replica_id: ReplicaId,
  69    pub user_id: UserId,
  70    pub is_host: bool,
  71    pub committer_name: Option<String>,
  72    pub committer_email: Option<String>,
  73}
  74
  75impl PartialOrd for User {
  76    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
  77        Some(self.cmp(other))
  78    }
  79}
  80
  81impl Ord for User {
  82    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
  83        self.github_login.cmp(&other.github_login)
  84    }
  85}
  86
  87impl PartialEq for User {
  88    fn eq(&self, other: &Self) -> bool {
  89        self.id == other.id && self.github_login == other.github_login
  90    }
  91}
  92
  93impl Eq for User {}
  94
  95#[derive(Debug, PartialEq)]
  96pub struct Contact {
  97    pub user: Arc<User>,
  98    pub online: bool,
  99    pub busy: bool,
 100}
 101
 102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 103pub enum ContactRequestStatus {
 104    None,
 105    RequestSent,
 106    RequestReceived,
 107    RequestAccepted,
 108}
 109
 110pub struct UserStore {
 111    users: HashMap<u64, Arc<User>>,
 112    by_github_login: HashMap<SharedString, u64>,
 113    participant_indices: HashMap<u64, ParticipantIndex>,
 114    update_contacts_tx: mpsc::UnboundedSender<UpdateContacts>,
 115    edit_prediction_usage: Option<EditPredictionUsage>,
 116    plan_info: Option<PlanInfo>,
 117    current_user: watch::Receiver<Option<Arc<User>>>,
 118    current_organization: Option<Arc<Organization>>,
 119    organizations: Vec<Arc<Organization>>,
 120    plans_by_organization: HashMap<OrganizationId, Plan>,
 121    configuration_by_organization: HashMap<OrganizationId, OrganizationConfiguration>,
 122    contacts: Vec<Arc<Contact>>,
 123    incoming_contact_requests: Vec<Arc<User>>,
 124    outgoing_contact_requests: Vec<Arc<User>>,
 125    pending_contact_requests: HashMap<u64, usize>,
 126    client: Weak<Client>,
 127    _maintain_contacts: Task<()>,
 128    _maintain_current_user: Task<Result<()>>,
 129    _handle_sign_out: Task<()>,
 130    weak_self: WeakEntity<Self>,
 131}
 132
 133#[derive(Clone)]
 134pub struct InviteInfo {
 135    pub count: u32,
 136    pub url: Arc<str>,
 137}
 138
 139pub enum Event {
 140    Contact {
 141        user: Arc<User>,
 142        kind: ContactEventKind,
 143    },
 144    ShowContacts,
 145    ParticipantIndicesChanged,
 146    PrivateUserInfoUpdated,
 147    PlanUpdated,
 148    OrganizationChanged,
 149}
 150
 151#[derive(Clone, Copy)]
 152pub enum ContactEventKind {
 153    Requested,
 154    Accepted,
 155    Cancelled,
 156}
 157
 158impl EventEmitter<Event> for UserStore {}
 159
 160enum UpdateContacts {
 161    Update(proto::UpdateContacts),
 162    Wait(postage::barrier::Sender),
 163    Clear(postage::barrier::Sender),
 164}
 165
 166#[derive(Debug, Clone, Copy, Deref)]
 167pub struct EditPredictionUsage(pub RequestUsage);
 168
 169#[derive(Debug, Clone, Copy)]
 170pub struct RequestUsage {
 171    pub limit: UsageLimit,
 172    pub amount: i32,
 173}
 174
 175impl UserStore {
 176    pub fn new(client: Arc<Client>, cx: &Context<Self>) -> Self {
 177        let (mut current_user_tx, current_user_rx) = watch::channel();
 178        let (sign_out_tx, mut sign_out_rx) = mpsc::unbounded();
 179        let (update_contacts_tx, mut update_contacts_rx) = mpsc::unbounded();
 180        let rpc_subscriptions = vec![
 181            client.add_message_handler(cx.weak_entity(), Self::handle_update_contacts),
 182            client.add_message_handler(cx.weak_entity(), Self::handle_show_contacts),
 183        ];
 184
 185        client.sign_out_tx.lock().replace(sign_out_tx);
 186        client.add_message_to_client_handler({
 187            let this = cx.weak_entity();
 188            move |message, cx| Self::handle_message_to_client(this.clone(), message, cx)
 189        });
 190
 191        Self {
 192            users: Default::default(),
 193            by_github_login: Default::default(),
 194            current_user: current_user_rx,
 195            current_organization: None,
 196            organizations: Vec::new(),
 197            plans_by_organization: HashMap::default(),
 198            configuration_by_organization: HashMap::default(),
 199            plan_info: None,
 200            edit_prediction_usage: None,
 201            contacts: Default::default(),
 202            incoming_contact_requests: Default::default(),
 203            participant_indices: Default::default(),
 204            outgoing_contact_requests: Default::default(),
 205            client: Arc::downgrade(&client),
 206            update_contacts_tx,
 207            _maintain_contacts: cx.spawn(async move |this, cx| {
 208                let _subscriptions = rpc_subscriptions;
 209                while let Some(message) = update_contacts_rx.next().await {
 210                    if let Ok(task) = this.update(cx, |this, cx| this.update_contacts(message, cx))
 211                    {
 212                        task.log_err().await;
 213                    } else {
 214                        break;
 215                    }
 216                }
 217            }),
 218            _maintain_current_user: cx.spawn(async move |this, cx| {
 219                let mut status = client.status();
 220                let weak = Arc::downgrade(&client);
 221                drop(client);
 222                while let Some(status) = status.next().await {
 223                    // if the client is dropped, the app is shutting down.
 224                    let Some(client) = weak.upgrade() else {
 225                        return Ok(());
 226                    };
 227                    match status {
 228                        Status::Authenticated
 229                        | Status::Reauthenticated
 230                        | Status::Connected { .. } => {
 231                            if let Some(user_id) = client.user_id() {
 232                                let response = client
 233                                    .cloud_client()
 234                                    .get_authenticated_user()
 235                                    .await
 236                                    .log_err();
 237
 238                                let current_user_and_response = if let Some(response) = response {
 239                                    let user = Arc::new(User {
 240                                        id: user_id,
 241                                        github_login: response.user.github_login.clone().into(),
 242                                        avatar_uri: response.user.avatar_url.clone().into(),
 243                                        name: response.user.name.clone(),
 244                                    });
 245
 246                                    Some((user, response))
 247                                } else {
 248                                    None
 249                                };
 250                                current_user_tx
 251                                    .send(
 252                                        current_user_and_response
 253                                            .as_ref()
 254                                            .map(|(user, _)| user.clone()),
 255                                    )
 256                                    .await
 257                                    .ok();
 258
 259                                cx.update(|cx| {
 260                                    if let Some((user, response)) = current_user_and_response {
 261                                        this.update(cx, |this, cx| {
 262                                            this.by_github_login
 263                                                .insert(user.github_login.clone(), user_id);
 264                                            this.users.insert(user_id, user);
 265                                            this.update_authenticated_user(response, cx)
 266                                        })
 267                                    } else {
 268                                        anyhow::Ok(())
 269                                    }
 270                                })?;
 271
 272                                this.update(cx, |_, cx| cx.notify())?;
 273                            }
 274                        }
 275                        Status::SignedOut => {
 276                            current_user_tx.send(None).await.ok();
 277                            this.update(cx, |this, cx| {
 278                                this.clear_organizations();
 279                                this.clear_plan_and_usage();
 280                                cx.emit(Event::PrivateUserInfoUpdated);
 281                                cx.notify();
 282                                this.clear_contacts()
 283                            })?
 284                            .await;
 285                        }
 286                        Status::ConnectionLost => {
 287                            this.update(cx, |this, cx| {
 288                                cx.notify();
 289                                this.clear_contacts()
 290                            })?
 291                            .await;
 292                        }
 293                        _ => {}
 294                    }
 295                }
 296                Ok(())
 297            }),
 298            _handle_sign_out: cx.spawn(async move |this, cx| {
 299                while let Some(()) = sign_out_rx.next().await {
 300                    let Some(client) = this
 301                        .read_with(cx, |this, _cx| this.client.upgrade())
 302                        .ok()
 303                        .flatten()
 304                    else {
 305                        break;
 306                    };
 307
 308                    client.sign_out(cx).await;
 309                }
 310            }),
 311            pending_contact_requests: Default::default(),
 312            weak_self: cx.weak_entity(),
 313        }
 314    }
 315
 316    #[cfg(feature = "test-support")]
 317    pub fn clear_cache(&mut self) {
 318        self.users.clear();
 319        self.by_github_login.clear();
 320    }
 321
 322    async fn handle_show_contacts(
 323        this: Entity<Self>,
 324        _: TypedEnvelope<proto::ShowContacts>,
 325        mut cx: AsyncApp,
 326    ) -> Result<()> {
 327        this.update(&mut cx, |_, cx| cx.emit(Event::ShowContacts));
 328        Ok(())
 329    }
 330
 331    async fn handle_update_contacts(
 332        this: Entity<Self>,
 333        message: TypedEnvelope<proto::UpdateContacts>,
 334        cx: AsyncApp,
 335    ) -> Result<()> {
 336        this.read_with(&cx, |this, _| {
 337            this.update_contacts_tx
 338                .unbounded_send(UpdateContacts::Update(message.payload))
 339                .unwrap();
 340        });
 341        Ok(())
 342    }
 343
 344    fn update_contacts(&mut self, message: UpdateContacts, cx: &Context<Self>) -> Task<Result<()>> {
 345        match message {
 346            UpdateContacts::Wait(barrier) => {
 347                drop(barrier);
 348                Task::ready(Ok(()))
 349            }
 350            UpdateContacts::Clear(barrier) => {
 351                self.contacts.clear();
 352                self.incoming_contact_requests.clear();
 353                self.outgoing_contact_requests.clear();
 354                drop(barrier);
 355                Task::ready(Ok(()))
 356            }
 357            UpdateContacts::Update(message) => {
 358                let mut user_ids = HashSet::default();
 359                for contact in &message.contacts {
 360                    user_ids.insert(contact.user_id);
 361                }
 362                user_ids.extend(message.incoming_requests.iter().map(|req| req.requester_id));
 363                user_ids.extend(message.outgoing_requests.iter());
 364
 365                let load_users = self.get_users(user_ids.into_iter().collect(), cx);
 366                cx.spawn(async move |this, cx| {
 367                    load_users.await?;
 368
 369                    // Users are fetched in parallel above and cached in call to get_users
 370                    // No need to parallelize here
 371                    let mut updated_contacts = Vec::new();
 372                    let this = this.upgrade().context("can't upgrade user store handle")?;
 373                    for contact in message.contacts {
 374                        updated_contacts
 375                            .push(Arc::new(Contact::from_proto(contact, &this, cx).await?));
 376                    }
 377
 378                    let mut incoming_requests = Vec::new();
 379                    for request in message.incoming_requests {
 380                        incoming_requests.push({
 381                            this.update(cx, |this, cx| this.get_user(request.requester_id, cx))
 382                                .await?
 383                        });
 384                    }
 385
 386                    let mut outgoing_requests = Vec::new();
 387                    for requested_user_id in message.outgoing_requests {
 388                        outgoing_requests.push(
 389                            this.update(cx, |this, cx| this.get_user(requested_user_id, cx))
 390                                .await?,
 391                        );
 392                    }
 393
 394                    let removed_contacts =
 395                        HashSet::<u64>::from_iter(message.remove_contacts.iter().copied());
 396                    let removed_incoming_requests =
 397                        HashSet::<u64>::from_iter(message.remove_incoming_requests.iter().copied());
 398                    let removed_outgoing_requests =
 399                        HashSet::<u64>::from_iter(message.remove_outgoing_requests.iter().copied());
 400
 401                    this.update(cx, |this, cx| {
 402                        // Remove contacts
 403                        this.contacts
 404                            .retain(|contact| !removed_contacts.contains(&contact.user.id));
 405                        // Update existing contacts and insert new ones
 406                        for updated_contact in updated_contacts {
 407                            match this.contacts.binary_search_by_key(
 408                                &&updated_contact.user.github_login,
 409                                |contact| &contact.user.github_login,
 410                            ) {
 411                                Ok(ix) => this.contacts[ix] = updated_contact,
 412                                Err(ix) => this.contacts.insert(ix, updated_contact),
 413                            }
 414                        }
 415
 416                        // Remove incoming contact requests
 417                        this.incoming_contact_requests.retain(|user| {
 418                            if removed_incoming_requests.contains(&user.id) {
 419                                cx.emit(Event::Contact {
 420                                    user: user.clone(),
 421                                    kind: ContactEventKind::Cancelled,
 422                                });
 423                                false
 424                            } else {
 425                                true
 426                            }
 427                        });
 428                        // Update existing incoming requests and insert new ones
 429                        for user in incoming_requests {
 430                            match this
 431                                .incoming_contact_requests
 432                                .binary_search_by_key(&&user.github_login, |contact| {
 433                                    &contact.github_login
 434                                }) {
 435                                Ok(ix) => this.incoming_contact_requests[ix] = user,
 436                                Err(ix) => this.incoming_contact_requests.insert(ix, user),
 437                            }
 438                        }
 439
 440                        // Remove outgoing contact requests
 441                        this.outgoing_contact_requests
 442                            .retain(|user| !removed_outgoing_requests.contains(&user.id));
 443                        // Update existing incoming requests and insert new ones
 444                        for request in outgoing_requests {
 445                            match this
 446                                .outgoing_contact_requests
 447                                .binary_search_by_key(&&request.github_login, |contact| {
 448                                    &contact.github_login
 449                                }) {
 450                                Ok(ix) => this.outgoing_contact_requests[ix] = request,
 451                                Err(ix) => this.outgoing_contact_requests.insert(ix, request),
 452                            }
 453                        }
 454
 455                        cx.notify();
 456                    });
 457
 458                    Ok(())
 459                })
 460            }
 461        }
 462    }
 463
 464    pub fn contacts(&self) -> &[Arc<Contact>] {
 465        &self.contacts
 466    }
 467
 468    pub fn has_contact(&self, user: &Arc<User>) -> bool {
 469        self.contacts
 470            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 471            .is_ok()
 472    }
 473
 474    pub fn incoming_contact_requests(&self) -> &[Arc<User>] {
 475        &self.incoming_contact_requests
 476    }
 477
 478    pub fn outgoing_contact_requests(&self) -> &[Arc<User>] {
 479        &self.outgoing_contact_requests
 480    }
 481
 482    pub fn is_contact_request_pending(&self, user: &User) -> bool {
 483        self.pending_contact_requests.contains_key(&user.id)
 484    }
 485
 486    pub fn contact_request_status(&self, user: &User) -> ContactRequestStatus {
 487        if self
 488            .contacts
 489            .binary_search_by_key(&&user.github_login, |contact| &contact.user.github_login)
 490            .is_ok()
 491        {
 492            ContactRequestStatus::RequestAccepted
 493        } else if self
 494            .outgoing_contact_requests
 495            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 496            .is_ok()
 497        {
 498            ContactRequestStatus::RequestSent
 499        } else if self
 500            .incoming_contact_requests
 501            .binary_search_by_key(&&user.github_login, |user| &user.github_login)
 502            .is_ok()
 503        {
 504            ContactRequestStatus::RequestReceived
 505        } else {
 506            ContactRequestStatus::None
 507        }
 508    }
 509
 510    pub fn request_contact(
 511        &mut self,
 512        responder_id: u64,
 513        cx: &mut Context<Self>,
 514    ) -> Task<Result<()>> {
 515        self.perform_contact_request(responder_id, proto::RequestContact { responder_id }, cx)
 516    }
 517
 518    pub fn remove_contact(&mut self, user_id: u64, cx: &mut Context<Self>) -> Task<Result<()>> {
 519        self.perform_contact_request(user_id, proto::RemoveContact { user_id }, cx)
 520    }
 521
 522    pub fn has_incoming_contact_request(&self, user_id: u64) -> bool {
 523        self.incoming_contact_requests
 524            .iter()
 525            .any(|user| user.id == user_id)
 526    }
 527
 528    pub fn respond_to_contact_request(
 529        &mut self,
 530        requester_id: u64,
 531        accept: bool,
 532        cx: &mut Context<Self>,
 533    ) -> Task<Result<()>> {
 534        self.perform_contact_request(
 535            requester_id,
 536            proto::RespondToContactRequest {
 537                requester_id,
 538                response: if accept {
 539                    proto::ContactRequestResponse::Accept
 540                } else {
 541                    proto::ContactRequestResponse::Decline
 542                } as i32,
 543            },
 544            cx,
 545        )
 546    }
 547
 548    pub fn dismiss_contact_request(
 549        &self,
 550        requester_id: u64,
 551        cx: &Context<Self>,
 552    ) -> Task<Result<()>> {
 553        let client = self.client.upgrade();
 554        cx.spawn(async move |_, _| {
 555            client
 556                .context("can't upgrade client reference")?
 557                .request(proto::RespondToContactRequest {
 558                    requester_id,
 559                    response: proto::ContactRequestResponse::Dismiss as i32,
 560                })
 561                .await?;
 562            Ok(())
 563        })
 564    }
 565
 566    fn perform_contact_request<T: RequestMessage>(
 567        &mut self,
 568        user_id: u64,
 569        request: T,
 570        cx: &mut Context<Self>,
 571    ) -> Task<Result<()>> {
 572        let client = self.client.upgrade();
 573        *self.pending_contact_requests.entry(user_id).or_insert(0) += 1;
 574        cx.notify();
 575
 576        cx.spawn(async move |this, cx| {
 577            let response = client
 578                .context("can't upgrade client reference")?
 579                .request(request)
 580                .await;
 581            this.update(cx, |this, cx| {
 582                if let Entry::Occupied(mut request_count) =
 583                    this.pending_contact_requests.entry(user_id)
 584                {
 585                    *request_count.get_mut() -= 1;
 586                    if *request_count.get() == 0 {
 587                        request_count.remove();
 588                    }
 589                }
 590                cx.notify();
 591            })?;
 592            response?;
 593            Ok(())
 594        })
 595    }
 596
 597    pub fn clear_contacts(&self) -> impl Future<Output = ()> + use<> {
 598        let (tx, mut rx) = postage::barrier::channel();
 599        self.update_contacts_tx
 600            .unbounded_send(UpdateContacts::Clear(tx))
 601            .unwrap();
 602        async move {
 603            rx.next().await;
 604        }
 605    }
 606
 607    pub fn contact_updates_done(&self) -> impl Future<Output = ()> {
 608        let (tx, mut rx) = postage::barrier::channel();
 609        self.update_contacts_tx
 610            .unbounded_send(UpdateContacts::Wait(tx))
 611            .unwrap();
 612        async move {
 613            rx.next().await;
 614        }
 615    }
 616
 617    pub fn get_users(
 618        &self,
 619        user_ids: Vec<u64>,
 620        cx: &Context<Self>,
 621    ) -> Task<Result<Vec<Arc<User>>>> {
 622        let mut user_ids_to_fetch = user_ids.clone();
 623        user_ids_to_fetch.retain(|id| !self.users.contains_key(id));
 624
 625        cx.spawn(async move |this, cx| {
 626            if !user_ids_to_fetch.is_empty() {
 627                this.update(cx, |this, cx| {
 628                    this.load_users(
 629                        proto::GetUsers {
 630                            user_ids: user_ids_to_fetch,
 631                        },
 632                        cx,
 633                    )
 634                })?
 635                .await?;
 636            }
 637
 638            this.read_with(cx, |this, _| {
 639                user_ids
 640                    .iter()
 641                    .map(|user_id| {
 642                        this.users
 643                            .get(user_id)
 644                            .cloned()
 645                            .with_context(|| format!("user {user_id} not found"))
 646                    })
 647                    .collect()
 648            })?
 649        })
 650    }
 651
 652    pub fn fuzzy_search_users(
 653        &self,
 654        query: String,
 655        cx: &Context<Self>,
 656    ) -> Task<Result<Vec<Arc<User>>>> {
 657        self.load_users(proto::FuzzySearchUsers { query }, cx)
 658    }
 659
 660    pub fn get_cached_user(&self, user_id: u64) -> Option<Arc<User>> {
 661        self.users.get(&user_id).cloned()
 662    }
 663
 664    pub fn get_user_optimistic(&self, user_id: u64, cx: &Context<Self>) -> Option<Arc<User>> {
 665        if let Some(user) = self.users.get(&user_id).cloned() {
 666            return Some(user);
 667        }
 668
 669        self.get_user(user_id, cx).detach_and_log_err(cx);
 670        None
 671    }
 672
 673    pub fn get_user(&self, user_id: u64, cx: &Context<Self>) -> Task<Result<Arc<User>>> {
 674        if let Some(user) = self.users.get(&user_id).cloned() {
 675            return Task::ready(Ok(user));
 676        }
 677
 678        let load_users = self.get_users(vec![user_id], cx);
 679        cx.spawn(async move |this, cx| {
 680            load_users.await?;
 681            this.read_with(cx, |this, _| {
 682                this.users
 683                    .get(&user_id)
 684                    .cloned()
 685                    .context("server responded with no users")
 686            })?
 687        })
 688    }
 689
 690    pub fn cached_user_by_github_login(&self, github_login: &str) -> Option<Arc<User>> {
 691        self.by_github_login
 692            .get(github_login)
 693            .and_then(|id| self.users.get(id).cloned())
 694    }
 695
 696    pub fn current_user(&self) -> Option<Arc<User>> {
 697        self.current_user.borrow().clone()
 698    }
 699
 700    pub fn current_organization(&self) -> Option<Arc<Organization>> {
 701        self.current_organization.clone()
 702    }
 703
 704    pub fn set_current_organization(
 705        &mut self,
 706        organization: Arc<Organization>,
 707        cx: &mut Context<Self>,
 708    ) {
 709        let is_same_organization = self
 710            .current_organization
 711            .as_ref()
 712            .is_some_and(|current| current.id == organization.id);
 713
 714        if !is_same_organization {
 715            let organization_id = organization.id.0.to_string();
 716            self.current_organization.replace(organization);
 717            cx.emit(Event::OrganizationChanged);
 718            cx.notify();
 719
 720            let kvp = KeyValueStore::global(cx);
 721            db::write_and_log(cx, move || async move {
 722                kvp.write_kvp(CURRENT_ORGANIZATION_ID_KEY.into(), organization_id)
 723                    .await
 724            });
 725        }
 726    }
 727
 728    pub fn organizations(&self) -> &Vec<Arc<Organization>> {
 729        &self.organizations
 730    }
 731
 732    pub fn plan_for_organization(&self, organization_id: &OrganizationId) -> Option<Plan> {
 733        self.plans_by_organization.get(organization_id).copied()
 734    }
 735
 736    pub fn current_organization_configuration(&self) -> Option<&OrganizationConfiguration> {
 737        let current_organization = self.current_organization.as_ref()?;
 738
 739        self.configuration_by_organization
 740            .get(&current_organization.id)
 741    }
 742
 743    pub fn plan(&self) -> Option<Plan> {
 744        #[cfg(debug_assertions)]
 745        if let Ok(plan) = std::env::var("ZED_SIMULATE_PLAN").as_ref() {
 746            use cloud_api_client::Plan;
 747
 748            return match plan.as_str() {
 749                "free" => Some(Plan::ZedFree),
 750                "trial" => Some(Plan::ZedProTrial),
 751                "pro" => Some(Plan::ZedPro),
 752                _ => {
 753                    panic!("ZED_SIMULATE_PLAN must be one of 'free', 'trial', or 'pro'");
 754                }
 755            };
 756        }
 757
 758        if let Some(organization) = &self.current_organization
 759            && let Some(plan) = self.plan_for_organization(&organization.id)
 760        {
 761            return Some(plan);
 762        }
 763
 764        self.plan_info.as_ref().map(|info| info.plan())
 765    }
 766
 767    pub fn subscription_period(&self) -> Option<(DateTime<Utc>, DateTime<Utc>)> {
 768        self.plan_info
 769            .as_ref()
 770            .and_then(|plan| plan.subscription_period)
 771            .map(|subscription_period| {
 772                (
 773                    subscription_period.started_at.0,
 774                    subscription_period.ended_at.0,
 775                )
 776            })
 777    }
 778
 779    pub fn trial_started_at(&self) -> Option<DateTime<Utc>> {
 780        self.plan_info
 781            .as_ref()
 782            .and_then(|plan| plan.trial_started_at)
 783            .map(|trial_started_at| trial_started_at.0)
 784    }
 785
 786    /// Returns whether the user's account is too new to use the service.
 787    pub fn account_too_young(&self) -> bool {
 788        self.plan_info
 789            .as_ref()
 790            .map(|plan| plan.is_account_too_young)
 791            .unwrap_or_default()
 792    }
 793
 794    /// Returns whether the current user has overdue invoices and usage should be blocked.
 795    pub fn has_overdue_invoices(&self) -> bool {
 796        self.plan_info
 797            .as_ref()
 798            .map(|plan| plan.has_overdue_invoices)
 799            .unwrap_or_default()
 800    }
 801
 802    pub fn edit_prediction_usage(&self) -> Option<EditPredictionUsage> {
 803        self.edit_prediction_usage
 804    }
 805
 806    pub fn update_edit_prediction_usage(
 807        &mut self,
 808        usage: EditPredictionUsage,
 809        cx: &mut Context<Self>,
 810    ) {
 811        self.edit_prediction_usage = Some(usage);
 812        cx.notify();
 813    }
 814
 815    pub fn clear_organizations(&mut self) {
 816        self.organizations.clear();
 817        self.current_organization = None;
 818    }
 819
 820    pub fn clear_plan_and_usage(&mut self) {
 821        self.plan_info = None;
 822        self.edit_prediction_usage = None;
 823    }
 824
 825    fn update_authenticated_user(
 826        &mut self,
 827        response: GetAuthenticatedUserResponse,
 828        cx: &mut Context<Self>,
 829    ) {
 830        let staff = response.user.is_staff && !*feature_flags::ZED_DISABLE_STAFF;
 831        cx.update_flags(staff, response.feature_flags);
 832        if let Some(client) = self.client.upgrade() {
 833            client
 834                .telemetry
 835                .set_authenticated_user_info(Some(response.user.metrics_id.clone()), staff);
 836        }
 837
 838        self.organizations = response.organizations.into_iter().map(Arc::new).collect();
 839        let persisted_org_id = KeyValueStore::global(cx)
 840            .read_kvp(CURRENT_ORGANIZATION_ID_KEY)
 841            .log_err()
 842            .flatten()
 843            .map(|id| OrganizationId(Arc::from(id)));
 844
 845        self.current_organization = persisted_org_id
 846            .and_then(|persisted_id| {
 847                self.organizations
 848                    .iter()
 849                    .find(|org| org.id == persisted_id)
 850                    .cloned()
 851            })
 852            .or_else(|| {
 853                response
 854                    .default_organization_id
 855                    .and_then(|default_organization_id| {
 856                        self.organizations
 857                            .iter()
 858                            .find(|organization| organization.id == default_organization_id)
 859                            .cloned()
 860                    })
 861            })
 862            .or_else(|| self.organizations.first().cloned());
 863        self.plans_by_organization = response
 864            .plans_by_organization
 865            .into_iter()
 866            .map(|(organization_id, plan)| {
 867                let plan = match plan {
 868                    KnownOrUnknown::Known(plan) => plan,
 869                    KnownOrUnknown::Unknown(_) => {
 870                        // If we get a plan that we don't recognize, fall back to the Free plan.
 871                        Plan::ZedFree
 872                    }
 873                };
 874
 875                (organization_id, plan)
 876            })
 877            .collect();
 878        self.configuration_by_organization =
 879            response.configuration_by_organization.into_iter().collect();
 880
 881        self.edit_prediction_usage = Some(EditPredictionUsage(RequestUsage {
 882            limit: response.plan.usage.edit_predictions.limit,
 883            amount: response.plan.usage.edit_predictions.used as i32,
 884        }));
 885        self.plan_info = Some(response.plan);
 886        cx.emit(Event::PrivateUserInfoUpdated);
 887    }
 888
 889    fn handle_message_to_client(this: WeakEntity<Self>, message: &MessageToClient, cx: &App) {
 890        cx.spawn(async move |cx| {
 891            match message {
 892                MessageToClient::UserUpdated => {
 893                    let cloud_client = cx
 894                        .update(|cx| {
 895                            this.read_with(cx, |this, _cx| {
 896                                this.client.upgrade().map(|client| client.cloud_client())
 897                            })
 898                        })?
 899                        .ok_or(anyhow::anyhow!("Failed to get Cloud client"))?;
 900
 901                    let response = cloud_client.get_authenticated_user().await?;
 902                    cx.update(|cx| {
 903                        this.update(cx, |this, cx| {
 904                            this.update_authenticated_user(response, cx);
 905                        })
 906                    })?;
 907                }
 908            }
 909
 910            anyhow::Ok(())
 911        })
 912        .detach_and_log_err(cx);
 913    }
 914
 915    pub fn watch_current_user(&self) -> watch::Receiver<Option<Arc<User>>> {
 916        self.current_user.clone()
 917    }
 918
 919    fn load_users(
 920        &self,
 921        request: impl RequestMessage<Response = UsersResponse>,
 922        cx: &Context<Self>,
 923    ) -> Task<Result<Vec<Arc<User>>>> {
 924        let client = self.client.clone();
 925        cx.spawn(async move |this, cx| {
 926            if let Some(rpc) = client.upgrade() {
 927                let response = rpc.request(request).await.context("error loading users")?;
 928                let users = response.users;
 929
 930                this.update(cx, |this, _| this.insert(users))
 931            } else {
 932                Ok(Vec::new())
 933            }
 934        })
 935    }
 936
 937    pub fn insert(&mut self, users: Vec<proto::User>) -> Vec<Arc<User>> {
 938        let mut ret = Vec::with_capacity(users.len());
 939        for user in users {
 940            let user = User::new(user);
 941            if let Some(old) = self.users.insert(user.id, user.clone())
 942                && old.github_login != user.github_login
 943            {
 944                self.by_github_login.remove(&old.github_login);
 945            }
 946            self.by_github_login
 947                .insert(user.github_login.clone(), user.id);
 948            ret.push(user)
 949        }
 950        ret
 951    }
 952
 953    pub fn set_participant_indices(
 954        &mut self,
 955        participant_indices: HashMap<u64, ParticipantIndex>,
 956        cx: &mut Context<Self>,
 957    ) {
 958        if participant_indices != self.participant_indices {
 959            self.participant_indices = participant_indices;
 960            cx.emit(Event::ParticipantIndicesChanged);
 961        }
 962    }
 963
 964    pub fn participant_indices(&self) -> &HashMap<u64, ParticipantIndex> {
 965        &self.participant_indices
 966    }
 967
 968    pub fn participant_names(
 969        &self,
 970        user_ids: impl Iterator<Item = u64>,
 971        cx: &App,
 972    ) -> HashMap<u64, SharedString> {
 973        let mut ret = HashMap::default();
 974        let mut missing_user_ids = Vec::new();
 975        for id in user_ids {
 976            if let Some(github_login) = self.get_cached_user(id).map(|u| u.github_login.clone()) {
 977                ret.insert(id, github_login);
 978            } else {
 979                missing_user_ids.push(id)
 980            }
 981        }
 982        if !missing_user_ids.is_empty() {
 983            let this = self.weak_self.clone();
 984            cx.spawn(async move |cx| {
 985                this.update(cx, |this, cx| this.get_users(missing_user_ids, cx))?
 986                    .await
 987            })
 988            .detach_and_log_err(cx);
 989        }
 990        ret
 991    }
 992}
 993
 994impl User {
 995    fn new(message: proto::User) -> Arc<Self> {
 996        Arc::new(User {
 997            id: message.id,
 998            github_login: message.github_login.into(),
 999            avatar_uri: message.avatar_url.into(),
1000            name: message.name,
1001        })
1002    }
1003}
1004
1005impl Contact {
1006    async fn from_proto(
1007        contact: proto::Contact,
1008        user_store: &Entity<UserStore>,
1009        cx: &mut AsyncApp,
1010    ) -> Result<Self> {
1011        let user = user_store
1012            .update(cx, |user_store, cx| {
1013                user_store.get_user(contact.user_id, cx)
1014            })
1015            .await?;
1016        Ok(Self {
1017            user,
1018            online: contact.online,
1019            busy: contact.busy,
1020        })
1021    }
1022}
1023
1024impl Collaborator {
1025    pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
1026        Ok(Self {
1027            peer_id: message.peer_id.context("invalid peer id")?,
1028            replica_id: ReplicaId::new(message.replica_id as u16),
1029            user_id: message.user_id as UserId,
1030            is_host: message.is_host,
1031            committer_name: message.committer_name,
1032            committer_email: message.committer_email,
1033        })
1034    }
1035}
1036
1037impl RequestUsage {
1038    pub fn over_limit(&self) -> bool {
1039        match self.limit {
1040            UsageLimit::Limited(limit) => self.amount >= limit,
1041            UsageLimit::Unlimited => false,
1042        }
1043    }
1044
1045    fn from_headers(
1046        limit_name: &str,
1047        amount_name: &str,
1048        headers: &HeaderMap<HeaderValue>,
1049    ) -> Result<Self> {
1050        let limit = headers
1051            .get(limit_name)
1052            .with_context(|| format!("missing {limit_name:?} header"))?;
1053        let limit = UsageLimit::from_str(limit.to_str()?)?;
1054
1055        let amount = headers
1056            .get(amount_name)
1057            .with_context(|| format!("missing {amount_name:?} header"))?;
1058        let amount = amount.to_str()?.parse::<i32>()?;
1059
1060        Ok(Self { limit, amount })
1061    }
1062}
1063
1064impl EditPredictionUsage {
1065    pub fn from_headers(headers: &HeaderMap<HeaderValue>) -> Result<Self> {
1066        Ok(Self(RequestUsage::from_headers(
1067            EDIT_PREDICTIONS_USAGE_LIMIT_HEADER_NAME,
1068            EDIT_PREDICTIONS_USAGE_AMOUNT_HEADER_NAME,
1069            headers,
1070        )?))
1071    }
1072}