client.rs

   1#[cfg(any(test, feature = "test-support"))]
   2pub mod test;
   3
   4pub mod telemetry;
   5pub mod user;
   6
   7use anyhow::{anyhow, Context as _, Result};
   8use async_recursion::async_recursion;
   9use async_tungstenite::tungstenite::{
  10    client::IntoClientRequest,
  11    error::Error as WebsocketError,
  12    http::{HeaderValue, Request, StatusCode},
  13};
  14use clock::SystemClock;
  15use collections::HashMap;
  16use futures::{
  17    channel::oneshot,
  18    future::{BoxFuture, LocalBoxFuture},
  19    AsyncReadExt, FutureExt, SinkExt, Stream, StreamExt, TryFutureExt as _, TryStreamExt,
  20};
  21use gpui::{
  22    actions, AnyModel, AnyWeakModel, AppContext, AsyncAppContext, Global, Model, Task, WeakModel,
  23};
  24use http_client::{AsyncBody, HttpClient, HttpClientWithUrl};
  25use lazy_static::lazy_static;
  26use parking_lot::RwLock;
  27use postage::watch;
  28use proto::ProtoClient;
  29use rand::prelude::*;
  30use release_channel::{AppVersion, ReleaseChannel};
  31use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage};
  32use schemars::JsonSchema;
  33use serde::{Deserialize, Serialize};
  34use settings::{Settings, SettingsSources};
  35use std::fmt;
  36use std::pin::Pin;
  37use std::{
  38    any::TypeId,
  39    convert::TryFrom,
  40    fmt::Write as _,
  41    future::Future,
  42    marker::PhantomData,
  43    path::PathBuf,
  44    sync::{
  45        atomic::{AtomicU64, Ordering},
  46        Arc, Weak,
  47    },
  48    time::{Duration, Instant},
  49};
  50use telemetry::Telemetry;
  51use thiserror::Error;
  52use url::Url;
  53use util::{ResultExt, TryFutureExt};
  54
  55pub use rpc::*;
  56pub use telemetry_events::Event;
  57pub use user::*;
  58
  59#[derive(Debug, Clone, Eq, PartialEq)]
  60pub struct DevServerToken(pub String);
  61
  62impl fmt::Display for DevServerToken {
  63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  64        write!(f, "{}", self.0)
  65    }
  66}
  67
  68lazy_static! {
  69    static ref ZED_SERVER_URL: Option<String> = std::env::var("ZED_SERVER_URL").ok();
  70    static ref ZED_RPC_URL: Option<String> = std::env::var("ZED_RPC_URL").ok();
  71    /// An environment variable whose presence indicates that the development auth
  72    /// provider should be used.
  73    ///
  74    /// Only works in development. Setting this environment variable in other release
  75    /// channels is a no-op.
  76    pub static ref ZED_DEVELOPMENT_AUTH: bool =
  77        std::env::var("ZED_DEVELOPMENT_AUTH").map_or(false, |value| !value.is_empty());
  78    pub static ref IMPERSONATE_LOGIN: Option<String> = std::env::var("ZED_IMPERSONATE")
  79        .ok()
  80        .and_then(|s| if s.is_empty() { None } else { Some(s) });
  81    pub static ref ADMIN_API_TOKEN: Option<String> = std::env::var("ZED_ADMIN_API_TOKEN")
  82        .ok()
  83        .and_then(|s| if s.is_empty() { None } else { Some(s) });
  84    pub static ref ZED_APP_PATH: Option<PathBuf> =
  85        std::env::var("ZED_APP_PATH").ok().map(PathBuf::from);
  86    pub static ref ZED_ALWAYS_ACTIVE: bool =
  87        std::env::var("ZED_ALWAYS_ACTIVE").map_or(false, |e| !e.is_empty());
  88}
  89
  90pub const INITIAL_RECONNECTION_DELAY: Duration = Duration::from_millis(500);
  91pub const MAX_RECONNECTION_DELAY: Duration = Duration::from_secs(10);
  92pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(20);
  93
  94actions!(client, [SignIn, SignOut, Reconnect]);
  95
  96#[derive(Clone, Default, Serialize, Deserialize, JsonSchema)]
  97pub struct ClientSettingsContent {
  98    server_url: Option<String>,
  99}
 100
 101#[derive(Deserialize)]
 102pub struct ClientSettings {
 103    pub server_url: String,
 104}
 105
 106impl Settings for ClientSettings {
 107    const KEY: Option<&'static str> = None;
 108
 109    type FileContent = ClientSettingsContent;
 110
 111    fn load(sources: SettingsSources<Self::FileContent>, _: &mut AppContext) -> Result<Self> {
 112        let mut result = sources.json_merge::<Self>()?;
 113        if let Some(server_url) = &*ZED_SERVER_URL {
 114            result.server_url.clone_from(&server_url)
 115        }
 116        Ok(result)
 117    }
 118}
 119
 120#[derive(Default, Clone, Serialize, Deserialize, JsonSchema)]
 121pub struct ProxySettingsContent {
 122    proxy: Option<String>,
 123}
 124
 125#[derive(Deserialize, Default)]
 126pub struct ProxySettings {
 127    pub proxy: Option<String>,
 128}
 129
 130impl Settings for ProxySettings {
 131    const KEY: Option<&'static str> = None;
 132
 133    type FileContent = ProxySettingsContent;
 134
 135    fn load(sources: SettingsSources<Self::FileContent>, _: &mut AppContext) -> Result<Self> {
 136        Ok(Self {
 137            proxy: sources
 138                .user
 139                .and_then(|value| value.proxy.clone())
 140                .or(sources.default.proxy.clone()),
 141        })
 142    }
 143}
 144
 145pub fn init_settings(cx: &mut AppContext) {
 146    TelemetrySettings::register(cx);
 147    ClientSettings::register(cx);
 148    ProxySettings::register(cx);
 149}
 150
 151pub fn init(client: &Arc<Client>, cx: &mut AppContext) {
 152    let client = Arc::downgrade(client);
 153    cx.on_action({
 154        let client = client.clone();
 155        move |_: &SignIn, cx| {
 156            if let Some(client) = client.upgrade() {
 157                cx.spawn(
 158                    |cx| async move { client.authenticate_and_connect(true, &cx).log_err().await },
 159                )
 160                .detach();
 161            }
 162        }
 163    });
 164
 165    cx.on_action({
 166        let client = client.clone();
 167        move |_: &SignOut, cx| {
 168            if let Some(client) = client.upgrade() {
 169                cx.spawn(|cx| async move {
 170                    client.sign_out(&cx).await;
 171                })
 172                .detach();
 173            }
 174        }
 175    });
 176
 177    cx.on_action({
 178        let client = client.clone();
 179        move |_: &Reconnect, cx| {
 180            if let Some(client) = client.upgrade() {
 181                cx.spawn(|cx| async move {
 182                    client.reconnect(&cx);
 183                })
 184                .detach();
 185            }
 186        }
 187    });
 188}
 189
 190struct GlobalClient(Arc<Client>);
 191
 192impl Global for GlobalClient {}
 193
 194pub struct Client {
 195    id: AtomicU64,
 196    peer: Arc<Peer>,
 197    http: Arc<HttpClientWithUrl>,
 198    telemetry: Arc<Telemetry>,
 199    credentials_provider: Arc<dyn CredentialsProvider + Send + Sync + 'static>,
 200    state: RwLock<ClientState>,
 201
 202    #[allow(clippy::type_complexity)]
 203    #[cfg(any(test, feature = "test-support"))]
 204    authenticate: RwLock<
 205        Option<Box<dyn 'static + Send + Sync + Fn(&AsyncAppContext) -> Task<Result<Credentials>>>>,
 206    >,
 207
 208    #[allow(clippy::type_complexity)]
 209    #[cfg(any(test, feature = "test-support"))]
 210    establish_connection: RwLock<
 211        Option<
 212            Box<
 213                dyn 'static
 214                    + Send
 215                    + Sync
 216                    + Fn(
 217                        &Credentials,
 218                        &AsyncAppContext,
 219                    ) -> Task<Result<Connection, EstablishConnectionError>>,
 220            >,
 221        >,
 222    >,
 223
 224    #[cfg(any(test, feature = "test-support"))]
 225    rpc_url: RwLock<Option<Url>>,
 226}
 227
 228#[derive(Error, Debug)]
 229pub enum EstablishConnectionError {
 230    #[error("upgrade required")]
 231    UpgradeRequired,
 232    #[error("unauthorized")]
 233    Unauthorized,
 234    #[error("{0}")]
 235    Other(#[from] anyhow::Error),
 236    #[error("{0}")]
 237    Http(#[from] http_client::Error),
 238    #[error("{0}")]
 239    InvalidHeaderValue(#[from] async_tungstenite::tungstenite::http::header::InvalidHeaderValue),
 240    #[error("{0}")]
 241    Io(#[from] std::io::Error),
 242    #[error("{0}")]
 243    Websocket(#[from] async_tungstenite::tungstenite::http::Error),
 244}
 245
 246impl From<WebsocketError> for EstablishConnectionError {
 247    fn from(error: WebsocketError) -> Self {
 248        if let WebsocketError::Http(response) = &error {
 249            match response.status() {
 250                StatusCode::UNAUTHORIZED => return EstablishConnectionError::Unauthorized,
 251                StatusCode::UPGRADE_REQUIRED => return EstablishConnectionError::UpgradeRequired,
 252                _ => {}
 253            }
 254        }
 255        EstablishConnectionError::Other(error.into())
 256    }
 257}
 258
 259impl EstablishConnectionError {
 260    pub fn other(error: impl Into<anyhow::Error> + Send + Sync) -> Self {
 261        Self::Other(error.into())
 262    }
 263}
 264
 265#[derive(Copy, Clone, Debug, PartialEq)]
 266pub enum Status {
 267    SignedOut,
 268    UpgradeRequired,
 269    Authenticating,
 270    Connecting,
 271    ConnectionError,
 272    Connected {
 273        peer_id: PeerId,
 274        connection_id: ConnectionId,
 275    },
 276    ConnectionLost,
 277    Reauthenticating,
 278    Reconnecting,
 279    ReconnectionError {
 280        next_reconnection: Instant,
 281    },
 282}
 283
 284impl Status {
 285    pub fn is_connected(&self) -> bool {
 286        matches!(self, Self::Connected { .. })
 287    }
 288
 289    pub fn is_signed_out(&self) -> bool {
 290        matches!(self, Self::SignedOut | Self::UpgradeRequired)
 291    }
 292}
 293
 294struct ClientState {
 295    credentials: Option<Credentials>,
 296    status: (watch::Sender<Status>, watch::Receiver<Status>),
 297    entity_id_extractors: HashMap<TypeId, fn(&dyn AnyTypedEnvelope) -> u64>,
 298    _reconnect_task: Option<Task<()>>,
 299    entities_by_type_and_remote_id: HashMap<(TypeId, u64), WeakSubscriber>,
 300    models_by_message_type: HashMap<TypeId, AnyWeakModel>,
 301    entity_types_by_message_type: HashMap<TypeId, TypeId>,
 302    #[allow(clippy::type_complexity)]
 303    message_handlers: HashMap<
 304        TypeId,
 305        Arc<
 306            dyn Send
 307                + Sync
 308                + Fn(
 309                    AnyModel,
 310                    Box<dyn AnyTypedEnvelope>,
 311                    &Arc<Client>,
 312                    AsyncAppContext,
 313                ) -> LocalBoxFuture<'static, Result<()>>,
 314        >,
 315    >,
 316}
 317
 318enum WeakSubscriber {
 319    Entity { handle: AnyWeakModel },
 320    Pending(Vec<Box<dyn AnyTypedEnvelope>>),
 321}
 322
 323#[derive(Clone, Debug, Eq, PartialEq)]
 324pub enum Credentials {
 325    DevServer { token: DevServerToken },
 326    User { user_id: u64, access_token: String },
 327}
 328
 329impl Credentials {
 330    pub fn authorization_header(&self) -> String {
 331        match self {
 332            Credentials::DevServer { token } => format!("dev-server-token {}", token),
 333            Credentials::User {
 334                user_id,
 335                access_token,
 336            } => format!("{} {}", user_id, access_token),
 337        }
 338    }
 339}
 340
 341/// A provider for [`Credentials`].
 342///
 343/// Used to abstract over reading and writing credentials to some form of
 344/// persistence (like the system keychain).
 345trait CredentialsProvider {
 346    /// Reads the credentials from the provider.
 347    fn read_credentials<'a>(
 348        &'a self,
 349        cx: &'a AsyncAppContext,
 350    ) -> Pin<Box<dyn Future<Output = Option<Credentials>> + 'a>>;
 351
 352    /// Writes the credentials to the provider.
 353    fn write_credentials<'a>(
 354        &'a self,
 355        user_id: u64,
 356        access_token: String,
 357        cx: &'a AsyncAppContext,
 358    ) -> Pin<Box<dyn Future<Output = Result<()>> + 'a>>;
 359
 360    /// Deletes the credentials from the provider.
 361    fn delete_credentials<'a>(
 362        &'a self,
 363        cx: &'a AsyncAppContext,
 364    ) -> Pin<Box<dyn Future<Output = Result<()>> + 'a>>;
 365}
 366
 367impl Default for ClientState {
 368    fn default() -> Self {
 369        Self {
 370            credentials: None,
 371            status: watch::channel_with(Status::SignedOut),
 372            entity_id_extractors: Default::default(),
 373            _reconnect_task: None,
 374            models_by_message_type: Default::default(),
 375            entities_by_type_and_remote_id: Default::default(),
 376            entity_types_by_message_type: Default::default(),
 377            message_handlers: Default::default(),
 378        }
 379    }
 380}
 381
 382pub enum Subscription {
 383    Entity {
 384        client: Weak<Client>,
 385        id: (TypeId, u64),
 386    },
 387    Message {
 388        client: Weak<Client>,
 389        id: TypeId,
 390    },
 391}
 392
 393impl Drop for Subscription {
 394    fn drop(&mut self) {
 395        match self {
 396            Subscription::Entity { client, id } => {
 397                if let Some(client) = client.upgrade() {
 398                    let mut state = client.state.write();
 399                    let _ = state.entities_by_type_and_remote_id.remove(id);
 400                }
 401            }
 402            Subscription::Message { client, id } => {
 403                if let Some(client) = client.upgrade() {
 404                    let mut state = client.state.write();
 405                    let _ = state.entity_types_by_message_type.remove(id);
 406                    let _ = state.message_handlers.remove(id);
 407                }
 408            }
 409        }
 410    }
 411}
 412
 413pub struct PendingEntitySubscription<T: 'static> {
 414    client: Arc<Client>,
 415    remote_id: u64,
 416    _entity_type: PhantomData<T>,
 417    consumed: bool,
 418}
 419
 420impl<T: 'static> PendingEntitySubscription<T> {
 421    pub fn set_model(mut self, model: &Model<T>, cx: &mut AsyncAppContext) -> Subscription {
 422        self.consumed = true;
 423        let mut state = self.client.state.write();
 424        let id = (TypeId::of::<T>(), self.remote_id);
 425        let Some(WeakSubscriber::Pending(messages)) =
 426            state.entities_by_type_and_remote_id.remove(&id)
 427        else {
 428            unreachable!()
 429        };
 430
 431        state.entities_by_type_and_remote_id.insert(
 432            id,
 433            WeakSubscriber::Entity {
 434                handle: model.downgrade().into(),
 435            },
 436        );
 437        drop(state);
 438        for message in messages {
 439            self.client.handle_message(message, cx);
 440        }
 441        Subscription::Entity {
 442            client: Arc::downgrade(&self.client),
 443            id,
 444        }
 445    }
 446}
 447
 448impl<T: 'static> Drop for PendingEntitySubscription<T> {
 449    fn drop(&mut self) {
 450        if !self.consumed {
 451            let mut state = self.client.state.write();
 452            if let Some(WeakSubscriber::Pending(messages)) = state
 453                .entities_by_type_and_remote_id
 454                .remove(&(TypeId::of::<T>(), self.remote_id))
 455            {
 456                for message in messages {
 457                    log::info!("unhandled message {}", message.payload_type_name());
 458                }
 459            }
 460        }
 461    }
 462}
 463
 464#[derive(Copy, Clone)]
 465pub struct TelemetrySettings {
 466    pub diagnostics: bool,
 467    pub metrics: bool,
 468}
 469
 470/// Control what info is collected by Zed.
 471#[derive(Default, Clone, Serialize, Deserialize, JsonSchema)]
 472pub struct TelemetrySettingsContent {
 473    /// Send debug info like crash reports.
 474    ///
 475    /// Default: true
 476    pub diagnostics: Option<bool>,
 477    /// Send anonymized usage data like what languages you're using Zed with.
 478    ///
 479    /// Default: true
 480    pub metrics: Option<bool>,
 481}
 482
 483impl settings::Settings for TelemetrySettings {
 484    const KEY: Option<&'static str> = Some("telemetry");
 485
 486    type FileContent = TelemetrySettingsContent;
 487
 488    fn load(sources: SettingsSources<Self::FileContent>, _: &mut AppContext) -> Result<Self> {
 489        Ok(Self {
 490            diagnostics: sources.user.as_ref().and_then(|v| v.diagnostics).unwrap_or(
 491                sources
 492                    .default
 493                    .diagnostics
 494                    .ok_or_else(Self::missing_default)?,
 495            ),
 496            metrics: sources
 497                .user
 498                .as_ref()
 499                .and_then(|v| v.metrics)
 500                .unwrap_or(sources.default.metrics.ok_or_else(Self::missing_default)?),
 501        })
 502    }
 503}
 504
 505impl Client {
 506    pub fn new(
 507        clock: Arc<dyn SystemClock>,
 508        http: Arc<HttpClientWithUrl>,
 509        cx: &mut AppContext,
 510    ) -> Arc<Self> {
 511        let use_zed_development_auth = match ReleaseChannel::try_global(cx) {
 512            Some(ReleaseChannel::Dev) => *ZED_DEVELOPMENT_AUTH,
 513            Some(ReleaseChannel::Nightly | ReleaseChannel::Preview | ReleaseChannel::Stable)
 514            | None => false,
 515        };
 516
 517        let credentials_provider: Arc<dyn CredentialsProvider + Send + Sync + 'static> =
 518            if use_zed_development_auth {
 519                Arc::new(DevelopmentCredentialsProvider {
 520                    path: paths::config_dir().join("development_auth"),
 521                })
 522            } else {
 523                Arc::new(KeychainCredentialsProvider)
 524            };
 525
 526        Arc::new(Self {
 527            id: AtomicU64::new(0),
 528            peer: Peer::new(0),
 529            telemetry: Telemetry::new(clock, http.clone(), cx),
 530            http,
 531            credentials_provider,
 532            state: Default::default(),
 533
 534            #[cfg(any(test, feature = "test-support"))]
 535            authenticate: Default::default(),
 536            #[cfg(any(test, feature = "test-support"))]
 537            establish_connection: Default::default(),
 538            #[cfg(any(test, feature = "test-support"))]
 539            rpc_url: RwLock::default(),
 540        })
 541    }
 542
 543    pub fn production(cx: &mut AppContext) -> Arc<Self> {
 544        let clock = Arc::new(clock::RealSystemClock);
 545        let http = Arc::new(HttpClientWithUrl::new(
 546            &ClientSettings::get_global(cx).server_url,
 547            ProxySettings::get_global(cx).proxy.clone(),
 548        ));
 549        Self::new(clock, http.clone(), cx)
 550    }
 551
 552    pub fn id(&self) -> u64 {
 553        self.id.load(Ordering::SeqCst)
 554    }
 555
 556    pub fn http_client(&self) -> Arc<HttpClientWithUrl> {
 557        self.http.clone()
 558    }
 559
 560    pub fn set_id(&self, id: u64) -> &Self {
 561        self.id.store(id, Ordering::SeqCst);
 562        self
 563    }
 564
 565    #[cfg(any(test, feature = "test-support"))]
 566    pub fn teardown(&self) {
 567        let mut state = self.state.write();
 568        state._reconnect_task.take();
 569        state.message_handlers.clear();
 570        state.models_by_message_type.clear();
 571        state.entities_by_type_and_remote_id.clear();
 572        state.entity_id_extractors.clear();
 573        self.peer.teardown();
 574    }
 575
 576    #[cfg(any(test, feature = "test-support"))]
 577    pub fn override_authenticate<F>(&self, authenticate: F) -> &Self
 578    where
 579        F: 'static + Send + Sync + Fn(&AsyncAppContext) -> Task<Result<Credentials>>,
 580    {
 581        *self.authenticate.write() = Some(Box::new(authenticate));
 582        self
 583    }
 584
 585    #[cfg(any(test, feature = "test-support"))]
 586    pub fn override_establish_connection<F>(&self, connect: F) -> &Self
 587    where
 588        F: 'static
 589            + Send
 590            + Sync
 591            + Fn(&Credentials, &AsyncAppContext) -> Task<Result<Connection, EstablishConnectionError>>,
 592    {
 593        *self.establish_connection.write() = Some(Box::new(connect));
 594        self
 595    }
 596
 597    #[cfg(any(test, feature = "test-support"))]
 598    pub fn override_rpc_url(&self, url: Url) -> &Self {
 599        *self.rpc_url.write() = Some(url);
 600        self
 601    }
 602
 603    pub fn global(cx: &AppContext) -> Arc<Self> {
 604        cx.global::<GlobalClient>().0.clone()
 605    }
 606    pub fn set_global(client: Arc<Client>, cx: &mut AppContext) {
 607        cx.set_global(GlobalClient(client))
 608    }
 609
 610    pub fn user_id(&self) -> Option<u64> {
 611        if let Some(Credentials::User { user_id, .. }) = self.state.read().credentials.as_ref() {
 612            Some(*user_id)
 613        } else {
 614            None
 615        }
 616    }
 617
 618    pub fn peer_id(&self) -> Option<PeerId> {
 619        if let Status::Connected { peer_id, .. } = &*self.status().borrow() {
 620            Some(*peer_id)
 621        } else {
 622            None
 623        }
 624    }
 625
 626    pub fn status(&self) -> watch::Receiver<Status> {
 627        self.state.read().status.1.clone()
 628    }
 629
 630    fn set_status(self: &Arc<Self>, status: Status, cx: &AsyncAppContext) {
 631        log::info!("set status on client {}: {:?}", self.id(), status);
 632        let mut state = self.state.write();
 633        *state.status.0.borrow_mut() = status;
 634
 635        match status {
 636            Status::Connected { .. } => {
 637                state._reconnect_task = None;
 638            }
 639            Status::ConnectionLost => {
 640                let this = self.clone();
 641                state._reconnect_task = Some(cx.spawn(move |cx| async move {
 642                    #[cfg(any(test, feature = "test-support"))]
 643                    let mut rng = StdRng::seed_from_u64(0);
 644                    #[cfg(not(any(test, feature = "test-support")))]
 645                    let mut rng = StdRng::from_entropy();
 646
 647                    let mut delay = INITIAL_RECONNECTION_DELAY;
 648                    while let Err(error) = this.authenticate_and_connect(true, &cx).await {
 649                        log::error!("failed to connect {}", error);
 650                        if matches!(*this.status().borrow(), Status::ConnectionError) {
 651                            this.set_status(
 652                                Status::ReconnectionError {
 653                                    next_reconnection: Instant::now() + delay,
 654                                },
 655                                &cx,
 656                            );
 657                            cx.background_executor().timer(delay).await;
 658                            delay = delay
 659                                .mul_f32(rng.gen_range(0.5..=2.5))
 660                                .max(INITIAL_RECONNECTION_DELAY)
 661                                .min(MAX_RECONNECTION_DELAY);
 662                        } else {
 663                            break;
 664                        }
 665                    }
 666                }));
 667            }
 668            Status::SignedOut | Status::UpgradeRequired => {
 669                self.telemetry.set_authenticated_user_info(None, false);
 670                state._reconnect_task.take();
 671            }
 672            _ => {}
 673        }
 674    }
 675
 676    pub fn subscribe_to_entity<T>(
 677        self: &Arc<Self>,
 678        remote_id: u64,
 679    ) -> Result<PendingEntitySubscription<T>>
 680    where
 681        T: 'static,
 682    {
 683        let id = (TypeId::of::<T>(), remote_id);
 684
 685        let mut state = self.state.write();
 686        if state.entities_by_type_and_remote_id.contains_key(&id) {
 687            return Err(anyhow!("already subscribed to entity"));
 688        }
 689
 690        state
 691            .entities_by_type_and_remote_id
 692            .insert(id, WeakSubscriber::Pending(Default::default()));
 693
 694        Ok(PendingEntitySubscription {
 695            client: self.clone(),
 696            remote_id,
 697            consumed: false,
 698            _entity_type: PhantomData,
 699        })
 700    }
 701
 702    #[track_caller]
 703    pub fn add_message_handler<M, E, H, F>(
 704        self: &Arc<Self>,
 705        entity: WeakModel<E>,
 706        handler: H,
 707    ) -> Subscription
 708    where
 709        M: EnvelopedMessage,
 710        E: 'static,
 711        H: 'static + Sync + Fn(Model<E>, TypedEnvelope<M>, AsyncAppContext) -> F + Send + Sync,
 712        F: 'static + Future<Output = Result<()>>,
 713    {
 714        self.add_message_handler_impl(entity, move |model, message, _, cx| {
 715            handler(model, message, cx)
 716        })
 717    }
 718
 719    fn add_message_handler_impl<M, E, H, F>(
 720        self: &Arc<Self>,
 721        entity: WeakModel<E>,
 722        handler: H,
 723    ) -> Subscription
 724    where
 725        M: EnvelopedMessage,
 726        E: 'static,
 727        H: 'static
 728            + Sync
 729            + Fn(Model<E>, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F
 730            + Send
 731            + Sync,
 732        F: 'static + Future<Output = Result<()>>,
 733    {
 734        let message_type_id = TypeId::of::<M>();
 735        let mut state = self.state.write();
 736        state
 737            .models_by_message_type
 738            .insert(message_type_id, entity.into());
 739
 740        let prev_handler = state.message_handlers.insert(
 741            message_type_id,
 742            Arc::new(move |subscriber, envelope, client, cx| {
 743                let subscriber = subscriber.downcast::<E>().unwrap();
 744                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 745                handler(subscriber, *envelope, client.clone(), cx).boxed_local()
 746            }),
 747        );
 748        if prev_handler.is_some() {
 749            let location = std::panic::Location::caller();
 750            panic!(
 751                "{}:{} registered handler for the same message {} twice",
 752                location.file(),
 753                location.line(),
 754                std::any::type_name::<M>()
 755            );
 756        }
 757
 758        Subscription::Message {
 759            client: Arc::downgrade(self),
 760            id: message_type_id,
 761        }
 762    }
 763
 764    pub fn add_request_handler<M, E, H, F>(
 765        self: &Arc<Self>,
 766        model: WeakModel<E>,
 767        handler: H,
 768    ) -> Subscription
 769    where
 770        M: RequestMessage,
 771        E: 'static,
 772        H: 'static + Sync + Fn(Model<E>, TypedEnvelope<M>, AsyncAppContext) -> F + Send + Sync,
 773        F: 'static + Future<Output = Result<M::Response>>,
 774    {
 775        self.add_message_handler_impl(model, move |handle, envelope, this, cx| {
 776            Self::respond_to_request(envelope.receipt(), handler(handle, envelope, cx), this)
 777        })
 778    }
 779
 780    pub fn add_model_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
 781    where
 782        M: EntityMessage,
 783        E: 'static,
 784        H: 'static + Fn(Model<E>, TypedEnvelope<M>, AsyncAppContext) -> F + Send + Sync,
 785        F: 'static + Future<Output = Result<()>>,
 786    {
 787        self.add_entity_message_handler::<M, E, _, _>(move |subscriber, message, _, cx| {
 788            handler(subscriber.downcast::<E>().unwrap(), message, cx)
 789        })
 790    }
 791
 792    fn add_entity_message_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
 793    where
 794        M: EntityMessage,
 795        E: 'static,
 796        H: 'static + Fn(AnyModel, TypedEnvelope<M>, Arc<Self>, AsyncAppContext) -> F + Send + Sync,
 797        F: 'static + Future<Output = Result<()>>,
 798    {
 799        let model_type_id = TypeId::of::<E>();
 800        let message_type_id = TypeId::of::<M>();
 801
 802        let mut state = self.state.write();
 803        state
 804            .entity_types_by_message_type
 805            .insert(message_type_id, model_type_id);
 806        state
 807            .entity_id_extractors
 808            .entry(message_type_id)
 809            .or_insert_with(|| {
 810                |envelope| {
 811                    envelope
 812                        .as_any()
 813                        .downcast_ref::<TypedEnvelope<M>>()
 814                        .unwrap()
 815                        .payload
 816                        .remote_entity_id()
 817                }
 818            });
 819        let prev_handler = state.message_handlers.insert(
 820            message_type_id,
 821            Arc::new(move |handle, envelope, client, cx| {
 822                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 823                handler(handle, *envelope, client.clone(), cx).boxed_local()
 824            }),
 825        );
 826        if prev_handler.is_some() {
 827            panic!("registered handler for the same message twice");
 828        }
 829    }
 830
 831    pub fn add_model_request_handler<M, E, H, F>(self: &Arc<Self>, handler: H)
 832    where
 833        M: EntityMessage + RequestMessage,
 834        E: 'static,
 835        H: 'static + Fn(Model<E>, TypedEnvelope<M>, AsyncAppContext) -> F + Send + Sync,
 836        F: 'static + Future<Output = Result<M::Response>>,
 837    {
 838        self.add_entity_message_handler::<M, E, _, _>(move |entity, envelope, client, cx| {
 839            Self::respond_to_request::<M, _>(
 840                envelope.receipt(),
 841                handler(entity.downcast::<E>().unwrap(), envelope, cx),
 842                client,
 843            )
 844        })
 845    }
 846
 847    async fn respond_to_request<T: RequestMessage, F: Future<Output = Result<T::Response>>>(
 848        receipt: Receipt<T>,
 849        response: F,
 850        client: Arc<Self>,
 851    ) -> Result<()> {
 852        match response.await {
 853            Ok(response) => {
 854                client.respond(receipt, response)?;
 855                Ok(())
 856            }
 857            Err(error) => {
 858                client.respond_with_error(receipt, error.to_proto())?;
 859                Err(error)
 860            }
 861        }
 862    }
 863
 864    pub async fn has_credentials(&self, cx: &AsyncAppContext) -> bool {
 865        self.credentials_provider
 866            .read_credentials(cx)
 867            .await
 868            .is_some()
 869    }
 870
 871    pub fn set_dev_server_token(&self, token: DevServerToken) -> &Self {
 872        self.state.write().credentials = Some(Credentials::DevServer { token });
 873        self
 874    }
 875
 876    #[async_recursion(?Send)]
 877    pub async fn authenticate_and_connect(
 878        self: &Arc<Self>,
 879        try_provider: bool,
 880        cx: &AsyncAppContext,
 881    ) -> anyhow::Result<()> {
 882        let was_disconnected = match *self.status().borrow() {
 883            Status::SignedOut => true,
 884            Status::ConnectionError
 885            | Status::ConnectionLost
 886            | Status::Authenticating { .. }
 887            | Status::Reauthenticating { .. }
 888            | Status::ReconnectionError { .. } => false,
 889            Status::Connected { .. } | Status::Connecting { .. } | Status::Reconnecting { .. } => {
 890                return Ok(())
 891            }
 892            Status::UpgradeRequired => return Err(EstablishConnectionError::UpgradeRequired)?,
 893        };
 894        if was_disconnected {
 895            self.set_status(Status::Authenticating, cx);
 896        } else {
 897            self.set_status(Status::Reauthenticating, cx)
 898        }
 899
 900        let mut read_from_provider = false;
 901        let mut credentials = self.state.read().credentials.clone();
 902        if credentials.is_none() && try_provider {
 903            credentials = self.credentials_provider.read_credentials(cx).await;
 904            read_from_provider = credentials.is_some();
 905        }
 906
 907        if credentials.is_none() {
 908            let mut status_rx = self.status();
 909            let _ = status_rx.next().await;
 910            futures::select_biased! {
 911                authenticate = self.authenticate(cx).fuse() => {
 912                    match authenticate {
 913                        Ok(creds) => credentials = Some(creds),
 914                        Err(err) => {
 915                            self.set_status(Status::ConnectionError, cx);
 916                            return Err(err);
 917                        }
 918                    }
 919                }
 920                _ = status_rx.next().fuse() => {
 921                    return Err(anyhow!("authentication canceled"));
 922                }
 923            }
 924        }
 925        let credentials = credentials.unwrap();
 926        if let Credentials::User { user_id, .. } = &credentials {
 927            self.set_id(*user_id);
 928        }
 929
 930        if was_disconnected {
 931            self.set_status(Status::Connecting, cx);
 932        } else {
 933            self.set_status(Status::Reconnecting, cx);
 934        }
 935
 936        let mut timeout =
 937            futures::FutureExt::fuse(cx.background_executor().timer(CONNECTION_TIMEOUT));
 938        futures::select_biased! {
 939            connection = self.establish_connection(&credentials, cx).fuse() => {
 940                match connection {
 941                    Ok(conn) => {
 942                        self.state.write().credentials = Some(credentials.clone());
 943                        if !read_from_provider && IMPERSONATE_LOGIN.is_none() {
 944                            if let Credentials::User{user_id, access_token} = credentials {
 945                                self.credentials_provider.write_credentials(user_id, access_token, cx).await.log_err();
 946                            }
 947                        }
 948
 949                        futures::select_biased! {
 950                            result = self.set_connection(conn, cx).fuse() => result,
 951                            _ = timeout => {
 952                                self.set_status(Status::ConnectionError, cx);
 953                                Err(anyhow!("timed out waiting on hello message from server"))
 954                            }
 955                        }
 956                    }
 957                    Err(EstablishConnectionError::Unauthorized) => {
 958                        self.state.write().credentials.take();
 959                        if read_from_provider {
 960                            self.credentials_provider.delete_credentials(cx).await.log_err();
 961                            self.set_status(Status::SignedOut, cx);
 962                            self.authenticate_and_connect(false, cx).await
 963                        } else {
 964                            self.set_status(Status::ConnectionError, cx);
 965                            Err(EstablishConnectionError::Unauthorized)?
 966                        }
 967                    }
 968                    Err(EstablishConnectionError::UpgradeRequired) => {
 969                        self.set_status(Status::UpgradeRequired, cx);
 970                        Err(EstablishConnectionError::UpgradeRequired)?
 971                    }
 972                    Err(error) => {
 973                        self.set_status(Status::ConnectionError, cx);
 974                        Err(error)?
 975                    }
 976                }
 977            }
 978            _ = &mut timeout => {
 979                self.set_status(Status::ConnectionError, cx);
 980                Err(anyhow!("timed out trying to establish connection"))
 981            }
 982        }
 983    }
 984
 985    async fn set_connection(
 986        self: &Arc<Self>,
 987        conn: Connection,
 988        cx: &AsyncAppContext,
 989    ) -> Result<()> {
 990        let executor = cx.background_executor();
 991        log::info!("add connection to peer");
 992        let (connection_id, handle_io, mut incoming) = self.peer.add_connection(conn, {
 993            let executor = executor.clone();
 994            move |duration| executor.timer(duration)
 995        });
 996        let handle_io = executor.spawn(handle_io);
 997
 998        let peer_id = async {
 999            log::info!("waiting for server hello");
1000            let message = incoming
1001                .next()
1002                .await
1003                .ok_or_else(|| anyhow!("no hello message received"))?;
1004            log::info!("got server hello");
1005            let hello_message_type_name = message.payload_type_name().to_string();
1006            let hello = message
1007                .into_any()
1008                .downcast::<TypedEnvelope<proto::Hello>>()
1009                .map_err(|_| {
1010                    anyhow!(
1011                        "invalid hello message received: {:?}",
1012                        hello_message_type_name
1013                    )
1014                })?;
1015            let peer_id = hello
1016                .payload
1017                .peer_id
1018                .ok_or_else(|| anyhow!("invalid peer id"))?;
1019            Ok(peer_id)
1020        };
1021
1022        let peer_id = match peer_id.await {
1023            Ok(peer_id) => peer_id,
1024            Err(error) => {
1025                self.peer.disconnect(connection_id);
1026                return Err(error);
1027            }
1028        };
1029
1030        log::info!(
1031            "set status to connected (connection id: {:?}, peer id: {:?})",
1032            connection_id,
1033            peer_id
1034        );
1035        self.set_status(
1036            Status::Connected {
1037                peer_id,
1038                connection_id,
1039            },
1040            cx,
1041        );
1042
1043        cx.spawn({
1044            let this = self.clone();
1045            |cx| {
1046                async move {
1047                    while let Some(message) = incoming.next().await {
1048                        this.handle_message(message, &cx);
1049                        // Don't starve the main thread when receiving lots of messages at once.
1050                        smol::future::yield_now().await;
1051                    }
1052                }
1053            }
1054        })
1055        .detach();
1056
1057        cx.spawn({
1058            let this = self.clone();
1059            move |cx| async move {
1060                match handle_io.await {
1061                    Ok(()) => {
1062                        if *this.status().borrow()
1063                            == (Status::Connected {
1064                                connection_id,
1065                                peer_id,
1066                            })
1067                        {
1068                            this.set_status(Status::SignedOut, &cx);
1069                        }
1070                    }
1071                    Err(err) => {
1072                        log::error!("connection error: {:?}", err);
1073                        this.set_status(Status::ConnectionLost, &cx);
1074                    }
1075                }
1076            }
1077        })
1078        .detach();
1079
1080        Ok(())
1081    }
1082
1083    fn authenticate(self: &Arc<Self>, cx: &AsyncAppContext) -> Task<Result<Credentials>> {
1084        #[cfg(any(test, feature = "test-support"))]
1085        if let Some(callback) = self.authenticate.read().as_ref() {
1086            return callback(cx);
1087        }
1088
1089        self.authenticate_with_browser(cx)
1090    }
1091
1092    fn establish_connection(
1093        self: &Arc<Self>,
1094        credentials: &Credentials,
1095        cx: &AsyncAppContext,
1096    ) -> Task<Result<Connection, EstablishConnectionError>> {
1097        #[cfg(any(test, feature = "test-support"))]
1098        if let Some(callback) = self.establish_connection.read().as_ref() {
1099            return callback(credentials, cx);
1100        }
1101
1102        self.establish_websocket_connection(credentials, cx)
1103    }
1104
1105    fn rpc_url(
1106        &self,
1107        http: Arc<HttpClientWithUrl>,
1108        release_channel: Option<ReleaseChannel>,
1109    ) -> impl Future<Output = Result<Url>> {
1110        #[cfg(any(test, feature = "test-support"))]
1111        let url_override = self.rpc_url.read().clone();
1112
1113        async move {
1114            #[cfg(any(test, feature = "test-support"))]
1115            if let Some(url) = url_override {
1116                return Ok(url);
1117            }
1118
1119            if let Some(url) = &*ZED_RPC_URL {
1120                return Url::parse(url).context("invalid rpc url");
1121            }
1122
1123            let mut url = http.build_url("/rpc");
1124            if let Some(preview_param) =
1125                release_channel.and_then(|channel| channel.release_query_param())
1126            {
1127                url += "?";
1128                url += preview_param;
1129            }
1130
1131            let response = http.get(&url, Default::default(), false).await?;
1132            let collab_url = if response.status().is_redirection() {
1133                response
1134                    .headers()
1135                    .get("Location")
1136                    .ok_or_else(|| anyhow!("missing location header in /rpc response"))?
1137                    .to_str()
1138                    .map_err(EstablishConnectionError::other)?
1139                    .to_string()
1140            } else {
1141                Err(anyhow!(
1142                    "unexpected /rpc response status {}",
1143                    response.status()
1144                ))?
1145            };
1146
1147            Url::parse(&collab_url).context("invalid rpc url")
1148        }
1149    }
1150
1151    fn establish_websocket_connection(
1152        self: &Arc<Self>,
1153        credentials: &Credentials,
1154        cx: &AsyncAppContext,
1155    ) -> Task<Result<Connection, EstablishConnectionError>> {
1156        let release_channel = cx
1157            .update(|cx| ReleaseChannel::try_global(cx))
1158            .ok()
1159            .flatten();
1160        let app_version = cx
1161            .update(|cx| AppVersion::global(cx).to_string())
1162            .ok()
1163            .unwrap_or_default();
1164
1165        let http = self.http.clone();
1166        let credentials = credentials.clone();
1167        let rpc_url = self.rpc_url(http, release_channel);
1168        cx.background_executor().spawn(async move {
1169            use HttpOrHttps::*;
1170
1171            #[derive(Debug)]
1172            enum HttpOrHttps {
1173                Http,
1174                Https,
1175            }
1176
1177            let mut rpc_url = rpc_url.await?;
1178            let url_scheme = match rpc_url.scheme() {
1179                "https" => Https,
1180                "http" => Http,
1181                _ => Err(anyhow!("invalid rpc url: {}", rpc_url))?,
1182            };
1183            let rpc_host = rpc_url
1184                .host_str()
1185                .zip(rpc_url.port_or_known_default())
1186                .ok_or_else(|| anyhow!("missing host in rpc url"))?;
1187            let stream = smol::net::TcpStream::connect(rpc_host).await?;
1188
1189            log::info!("connected to rpc endpoint {}", rpc_url);
1190
1191            rpc_url
1192                .set_scheme(match url_scheme {
1193                    Https => "wss",
1194                    Http => "ws",
1195                })
1196                .unwrap();
1197
1198            // We call `into_client_request` to let `tungstenite` construct the WebSocket request
1199            // for us from the RPC URL.
1200            //
1201            // Among other things, it will generate and set a `Sec-WebSocket-Key` header for us.
1202            let mut request = rpc_url.into_client_request()?;
1203
1204            // We then modify the request to add our desired headers.
1205            let request_headers = request.headers_mut();
1206            request_headers.insert(
1207                "Authorization",
1208                HeaderValue::from_str(&credentials.authorization_header())?,
1209            );
1210            request_headers.insert(
1211                "x-zed-protocol-version",
1212                HeaderValue::from_str(&rpc::PROTOCOL_VERSION.to_string())?,
1213            );
1214            request_headers.insert("x-zed-app-version", HeaderValue::from_str(&app_version)?);
1215            request_headers.insert(
1216                "x-zed-release-channel",
1217                HeaderValue::from_str(&release_channel.map(|r| r.dev_name()).unwrap_or("unknown"))?,
1218            );
1219
1220            match url_scheme {
1221                Https => {
1222                    let (stream, _) =
1223                        async_tungstenite::async_std::client_async_tls(request, stream).await?;
1224                    Ok(Connection::new(
1225                        stream
1226                            .map_err(|error| anyhow!(error))
1227                            .sink_map_err(|error| anyhow!(error)),
1228                    ))
1229                }
1230                Http => {
1231                    let (stream, _) = async_tungstenite::client_async(request, stream).await?;
1232                    Ok(Connection::new(
1233                        stream
1234                            .map_err(|error| anyhow!(error))
1235                            .sink_map_err(|error| anyhow!(error)),
1236                    ))
1237                }
1238            }
1239        })
1240    }
1241
1242    pub fn authenticate_with_browser(
1243        self: &Arc<Self>,
1244        cx: &AsyncAppContext,
1245    ) -> Task<Result<Credentials>> {
1246        let http = self.http.clone();
1247        let this = self.clone();
1248        cx.spawn(|cx| async move {
1249            let background = cx.background_executor().clone();
1250
1251            let (open_url_tx, open_url_rx) = oneshot::channel::<String>();
1252            cx.update(|cx| {
1253                cx.spawn(move |cx| async move {
1254                    let url = open_url_rx.await?;
1255                    cx.update(|cx| cx.open_url(&url))
1256                })
1257                .detach_and_log_err(cx);
1258            })
1259            .log_err();
1260
1261            let credentials = background
1262                .clone()
1263                .spawn(async move {
1264                    // Generate a pair of asymmetric encryption keys. The public key will be used by the
1265                    // zed server to encrypt the user's access token, so that it can'be intercepted by
1266                    // any other app running on the user's device.
1267                    let (public_key, private_key) =
1268                        rpc::auth::keypair().expect("failed to generate keypair for auth");
1269                    let public_key_string = String::try_from(public_key)
1270                        .expect("failed to serialize public key for auth");
1271
1272                    if let Some((login, token)) =
1273                        IMPERSONATE_LOGIN.as_ref().zip(ADMIN_API_TOKEN.as_ref())
1274                    {
1275                        eprintln!("authenticate as admin {login}, {token}");
1276
1277                        return this
1278                            .authenticate_as_admin(http, login.clone(), token.clone())
1279                            .await;
1280                    }
1281
1282                    // Start an HTTP server to receive the redirect from Zed's sign-in page.
1283                    let server =
1284                        tiny_http::Server::http("127.0.0.1:0").expect("failed to find open port");
1285                    let port = server.server_addr().port();
1286
1287                    // Open the Zed sign-in page in the user's browser, with query parameters that indicate
1288                    // that the user is signing in from a Zed app running on the same device.
1289                    let mut url = http.build_url(&format!(
1290                        "/native_app_signin?native_app_port={}&native_app_public_key={}",
1291                        port, public_key_string
1292                    ));
1293
1294                    if let Some(impersonate_login) = IMPERSONATE_LOGIN.as_ref() {
1295                        log::info!("impersonating user @{}", impersonate_login);
1296                        write!(&mut url, "&impersonate={}", impersonate_login).unwrap();
1297                    }
1298
1299                    open_url_tx.send(url).log_err();
1300
1301                    // Receive the HTTP request from the user's browser. Retrieve the user id and encrypted
1302                    // access token from the query params.
1303                    //
1304                    // TODO - Avoid ever starting more than one HTTP server. Maybe switch to using a
1305                    // custom URL scheme instead of this local HTTP server.
1306                    let (user_id, access_token) = background
1307                        .spawn(async move {
1308                            for _ in 0..100 {
1309                                if let Some(req) = server.recv_timeout(Duration::from_secs(1))? {
1310                                    let path = req.url();
1311                                    let mut user_id = None;
1312                                    let mut access_token = None;
1313                                    let url = Url::parse(&format!("http://example.com{}", path))
1314                                        .context("failed to parse login notification url")?;
1315                                    for (key, value) in url.query_pairs() {
1316                                        if key == "access_token" {
1317                                            access_token = Some(value.to_string());
1318                                        } else if key == "user_id" {
1319                                            user_id = Some(value.to_string());
1320                                        }
1321                                    }
1322
1323                                    let post_auth_url =
1324                                        http.build_url("/native_app_signin_succeeded");
1325                                    req.respond(
1326                                        tiny_http::Response::empty(302).with_header(
1327                                            tiny_http::Header::from_bytes(
1328                                                &b"Location"[..],
1329                                                post_auth_url.as_bytes(),
1330                                            )
1331                                            .unwrap(),
1332                                        ),
1333                                    )
1334                                    .context("failed to respond to login http request")?;
1335                                    return Ok((
1336                                        user_id
1337                                            .ok_or_else(|| anyhow!("missing user_id parameter"))?,
1338                                        access_token.ok_or_else(|| {
1339                                            anyhow!("missing access_token parameter")
1340                                        })?,
1341                                    ));
1342                                }
1343                            }
1344
1345                            Err(anyhow!("didn't receive login redirect"))
1346                        })
1347                        .await?;
1348
1349                    let access_token = private_key
1350                        .decrypt_string(&access_token)
1351                        .context("failed to decrypt access token")?;
1352
1353                    Ok(Credentials::User {
1354                        user_id: user_id.parse()?,
1355                        access_token,
1356                    })
1357                })
1358                .await?;
1359
1360            cx.update(|cx| cx.activate(true))?;
1361            Ok(credentials)
1362        })
1363    }
1364
1365    async fn authenticate_as_admin(
1366        self: &Arc<Self>,
1367        http: Arc<HttpClientWithUrl>,
1368        login: String,
1369        mut api_token: String,
1370    ) -> Result<Credentials> {
1371        #[derive(Deserialize)]
1372        struct AuthenticatedUserResponse {
1373            user: User,
1374        }
1375
1376        #[derive(Deserialize)]
1377        struct User {
1378            id: u64,
1379        }
1380
1381        // Use the collab server's admin API to retrieve the id
1382        // of the impersonated user.
1383        let mut url = self.rpc_url(http.clone(), None).await?;
1384        url.set_path("/user");
1385        url.set_query(Some(&format!("github_login={login}")));
1386        let request: http_client::Request<AsyncBody> = Request::get(url.as_str())
1387            .header("Authorization", format!("token {api_token}"))
1388            .body("".into())?;
1389
1390        let mut response = http.send(request).await?;
1391        let mut body = String::new();
1392        response.body_mut().read_to_string(&mut body).await?;
1393        if !response.status().is_success() {
1394            Err(anyhow!(
1395                "admin user request failed {} - {}",
1396                response.status().as_u16(),
1397                body,
1398            ))?;
1399        }
1400        let response: AuthenticatedUserResponse = serde_json::from_str(&body)?;
1401
1402        // Use the admin API token to authenticate as the impersonated user.
1403        api_token.insert_str(0, "ADMIN_TOKEN:");
1404        Ok(Credentials::User {
1405            user_id: response.user.id,
1406            access_token: api_token,
1407        })
1408    }
1409
1410    pub async fn sign_out(self: &Arc<Self>, cx: &AsyncAppContext) {
1411        self.state.write().credentials = None;
1412        self.disconnect(&cx);
1413
1414        if self.has_credentials(cx).await {
1415            self.credentials_provider
1416                .delete_credentials(cx)
1417                .await
1418                .log_err();
1419        }
1420    }
1421
1422    pub fn disconnect(self: &Arc<Self>, cx: &AsyncAppContext) {
1423        self.peer.teardown();
1424        self.set_status(Status::SignedOut, cx);
1425    }
1426
1427    pub fn reconnect(self: &Arc<Self>, cx: &AsyncAppContext) {
1428        self.peer.teardown();
1429        self.set_status(Status::ConnectionLost, cx);
1430    }
1431
1432    fn connection_id(&self) -> Result<ConnectionId> {
1433        if let Status::Connected { connection_id, .. } = *self.status().borrow() {
1434            Ok(connection_id)
1435        } else {
1436            Err(anyhow!("not connected"))
1437        }
1438    }
1439
1440    pub fn send<T: EnvelopedMessage>(&self, message: T) -> Result<()> {
1441        log::debug!("rpc send. client_id:{}, name:{}", self.id(), T::NAME);
1442        self.peer.send(self.connection_id()?, message)
1443    }
1444
1445    pub fn send_dynamic(&self, envelope: proto::Envelope) -> Result<()> {
1446        let connection_id = self.connection_id()?;
1447        self.peer.send_dynamic(connection_id, envelope)
1448    }
1449
1450    pub fn request<T: RequestMessage>(
1451        &self,
1452        request: T,
1453    ) -> impl Future<Output = Result<T::Response>> {
1454        self.request_envelope(request)
1455            .map_ok(|envelope| envelope.payload)
1456    }
1457
1458    pub fn request_stream<T: RequestMessage>(
1459        &self,
1460        request: T,
1461    ) -> impl Future<Output = Result<impl Stream<Item = Result<T::Response>>>> {
1462        let client_id = self.id.load(Ordering::SeqCst);
1463        log::debug!(
1464            "rpc request start. client_id:{}. name:{}",
1465            client_id,
1466            T::NAME
1467        );
1468        let response = self
1469            .connection_id()
1470            .map(|conn_id| self.peer.request_stream(conn_id, request));
1471        async move {
1472            let response = response?.await;
1473            log::debug!(
1474                "rpc request finish. client_id:{}. name:{}",
1475                client_id,
1476                T::NAME
1477            );
1478            response
1479        }
1480    }
1481
1482    pub fn request_envelope<T: RequestMessage>(
1483        &self,
1484        request: T,
1485    ) -> impl Future<Output = Result<TypedEnvelope<T::Response>>> {
1486        let client_id = self.id();
1487        log::debug!(
1488            "rpc request start. client_id:{}. name:{}",
1489            client_id,
1490            T::NAME
1491        );
1492        let response = self
1493            .connection_id()
1494            .map(|conn_id| self.peer.request_envelope(conn_id, request));
1495        async move {
1496            let response = response?.await;
1497            log::debug!(
1498                "rpc request finish. client_id:{}. name:{}",
1499                client_id,
1500                T::NAME
1501            );
1502            response
1503        }
1504    }
1505
1506    pub fn request_dynamic(
1507        &self,
1508        envelope: proto::Envelope,
1509        request_type: &'static str,
1510    ) -> impl Future<Output = Result<proto::Envelope>> {
1511        let client_id = self.id();
1512        log::debug!(
1513            "rpc request start. client_id:{}. name:{}",
1514            client_id,
1515            request_type
1516        );
1517        let response = self
1518            .connection_id()
1519            .map(|conn_id| self.peer.request_dynamic(conn_id, envelope, request_type));
1520        async move {
1521            let response = response?.await;
1522            log::debug!(
1523                "rpc request finish. client_id:{}. name:{}",
1524                client_id,
1525                request_type
1526            );
1527            Ok(response?.0)
1528        }
1529    }
1530
1531    fn respond<T: RequestMessage>(&self, receipt: Receipt<T>, response: T::Response) -> Result<()> {
1532        log::debug!("rpc respond. client_id:{}. name:{}", self.id(), T::NAME);
1533        self.peer.respond(receipt, response)
1534    }
1535
1536    fn respond_with_error<T: RequestMessage>(
1537        &self,
1538        receipt: Receipt<T>,
1539        error: proto::Error,
1540    ) -> Result<()> {
1541        log::debug!("rpc respond. client_id:{}. name:{}", self.id(), T::NAME);
1542        self.peer.respond_with_error(receipt, error)
1543    }
1544
1545    fn handle_message(
1546        self: &Arc<Client>,
1547        message: Box<dyn AnyTypedEnvelope>,
1548        cx: &AsyncAppContext,
1549    ) {
1550        let mut state = self.state.write();
1551        let type_name = message.payload_type_name();
1552        let payload_type_id = message.payload_type_id();
1553        let sender_id = message.original_sender_id();
1554
1555        let mut subscriber = None;
1556
1557        if let Some(handle) = state
1558            .models_by_message_type
1559            .get(&payload_type_id)
1560            .and_then(|handle| handle.upgrade())
1561        {
1562            subscriber = Some(handle);
1563        } else if let Some((extract_entity_id, entity_type_id)) =
1564            state.entity_id_extractors.get(&payload_type_id).zip(
1565                state
1566                    .entity_types_by_message_type
1567                    .get(&payload_type_id)
1568                    .copied(),
1569            )
1570        {
1571            let entity_id = (extract_entity_id)(message.as_ref());
1572
1573            match state
1574                .entities_by_type_and_remote_id
1575                .get_mut(&(entity_type_id, entity_id))
1576            {
1577                Some(WeakSubscriber::Pending(pending)) => {
1578                    pending.push(message);
1579                    return;
1580                }
1581                Some(weak_subscriber) => match weak_subscriber {
1582                    WeakSubscriber::Entity { handle } => {
1583                        subscriber = handle.upgrade();
1584                    }
1585
1586                    WeakSubscriber::Pending(_) => {}
1587                },
1588                _ => {}
1589            }
1590        }
1591
1592        let subscriber = if let Some(subscriber) = subscriber {
1593            subscriber
1594        } else {
1595            log::info!("unhandled message {}", type_name);
1596            self.peer.respond_with_unhandled_message(message).log_err();
1597            return;
1598        };
1599
1600        let handler = state.message_handlers.get(&payload_type_id).cloned();
1601        // Dropping the state prevents deadlocks if the handler interacts with rpc::Client.
1602        // It also ensures we don't hold the lock while yielding back to the executor, as
1603        // that might cause the executor thread driving this future to block indefinitely.
1604        drop(state);
1605
1606        if let Some(handler) = handler {
1607            let future = handler(subscriber, message, self, cx.clone());
1608            let client_id = self.id();
1609            log::debug!(
1610                "rpc message received. client_id:{}, sender_id:{:?}, type:{}",
1611                client_id,
1612                sender_id,
1613                type_name
1614            );
1615            cx.spawn(move |_| async move {
1616                    match future.await {
1617                        Ok(()) => {
1618                            log::debug!(
1619                                "rpc message handled. client_id:{}, sender_id:{:?}, type:{}",
1620                                client_id,
1621                                sender_id,
1622                                type_name
1623                            );
1624                        }
1625                        Err(error) => {
1626                            log::error!(
1627                                "error handling message. client_id:{}, sender_id:{:?}, type:{}, error:{:?}",
1628                                client_id,
1629                                sender_id,
1630                                type_name,
1631                                error
1632                            );
1633                        }
1634                    }
1635                })
1636                .detach();
1637        } else {
1638            log::info!("unhandled message {}", type_name);
1639            self.peer.respond_with_unhandled_message(message).log_err();
1640        }
1641    }
1642
1643    pub fn telemetry(&self) -> &Arc<Telemetry> {
1644        &self.telemetry
1645    }
1646}
1647
1648impl ProtoClient for Client {
1649    fn request(
1650        &self,
1651        envelope: proto::Envelope,
1652        request_type: &'static str,
1653    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1654        self.request_dynamic(envelope, request_type).boxed()
1655    }
1656
1657    fn send(&self, envelope: proto::Envelope) -> Result<()> {
1658        self.send_dynamic(envelope)
1659    }
1660}
1661
1662#[derive(Serialize, Deserialize)]
1663struct DevelopmentCredentials {
1664    user_id: u64,
1665    access_token: String,
1666}
1667
1668/// A credentials provider that stores credentials in a local file.
1669///
1670/// This MUST only be used in development, as this is not a secure way of storing
1671/// credentials on user machines.
1672///
1673/// Its existence is purely to work around the annoyance of having to constantly
1674/// re-allow access to the system keychain when developing Zed.
1675struct DevelopmentCredentialsProvider {
1676    path: PathBuf,
1677}
1678
1679impl CredentialsProvider for DevelopmentCredentialsProvider {
1680    fn read_credentials<'a>(
1681        &'a self,
1682        _cx: &'a AsyncAppContext,
1683    ) -> Pin<Box<dyn Future<Output = Option<Credentials>> + 'a>> {
1684        async move {
1685            if IMPERSONATE_LOGIN.is_some() {
1686                return None;
1687            }
1688
1689            let json = std::fs::read(&self.path).log_err()?;
1690
1691            let credentials: DevelopmentCredentials = serde_json::from_slice(&json).log_err()?;
1692
1693            Some(Credentials::User {
1694                user_id: credentials.user_id,
1695                access_token: credentials.access_token,
1696            })
1697        }
1698        .boxed_local()
1699    }
1700
1701    fn write_credentials<'a>(
1702        &'a self,
1703        user_id: u64,
1704        access_token: String,
1705        _cx: &'a AsyncAppContext,
1706    ) -> Pin<Box<dyn Future<Output = Result<()>> + 'a>> {
1707        async move {
1708            let json = serde_json::to_string(&DevelopmentCredentials {
1709                user_id,
1710                access_token,
1711            })?;
1712
1713            std::fs::write(&self.path, json)?;
1714
1715            Ok(())
1716        }
1717        .boxed_local()
1718    }
1719
1720    fn delete_credentials<'a>(
1721        &'a self,
1722        _cx: &'a AsyncAppContext,
1723    ) -> Pin<Box<dyn Future<Output = Result<()>> + 'a>> {
1724        async move { Ok(std::fs::remove_file(&self.path)?) }.boxed_local()
1725    }
1726}
1727
1728/// A credentials provider that stores credentials in the system keychain.
1729struct KeychainCredentialsProvider;
1730
1731impl CredentialsProvider for KeychainCredentialsProvider {
1732    fn read_credentials<'a>(
1733        &'a self,
1734        cx: &'a AsyncAppContext,
1735    ) -> Pin<Box<dyn Future<Output = Option<Credentials>> + 'a>> {
1736        async move {
1737            if IMPERSONATE_LOGIN.is_some() {
1738                return None;
1739            }
1740
1741            let (user_id, access_token) = cx
1742                .update(|cx| cx.read_credentials(&ClientSettings::get_global(cx).server_url))
1743                .log_err()?
1744                .await
1745                .log_err()??;
1746
1747            Some(Credentials::User {
1748                user_id: user_id.parse().ok()?,
1749                access_token: String::from_utf8(access_token).ok()?,
1750            })
1751        }
1752        .boxed_local()
1753    }
1754
1755    fn write_credentials<'a>(
1756        &'a self,
1757        user_id: u64,
1758        access_token: String,
1759        cx: &'a AsyncAppContext,
1760    ) -> Pin<Box<dyn Future<Output = Result<()>> + 'a>> {
1761        async move {
1762            cx.update(move |cx| {
1763                cx.write_credentials(
1764                    &ClientSettings::get_global(cx).server_url,
1765                    &user_id.to_string(),
1766                    access_token.as_bytes(),
1767                )
1768            })?
1769            .await
1770        }
1771        .boxed_local()
1772    }
1773
1774    fn delete_credentials<'a>(
1775        &'a self,
1776        cx: &'a AsyncAppContext,
1777    ) -> Pin<Box<dyn Future<Output = Result<()>> + 'a>> {
1778        async move {
1779            cx.update(move |cx| cx.delete_credentials(&ClientSettings::get_global(cx).server_url))?
1780                .await
1781        }
1782        .boxed_local()
1783    }
1784}
1785
1786/// prefix for the zed:// url scheme
1787pub static ZED_URL_SCHEME: &str = "zed";
1788
1789/// Parses the given link into a Zed link.
1790///
1791/// Returns a [`Some`] containing the unprefixed link if the link is a Zed link.
1792/// Returns [`None`] otherwise.
1793pub fn parse_zed_link<'a>(link: &'a str, cx: &AppContext) -> Option<&'a str> {
1794    let server_url = &ClientSettings::get_global(cx).server_url;
1795    if let Some(stripped) = link
1796        .strip_prefix(server_url)
1797        .and_then(|result| result.strip_prefix('/'))
1798    {
1799        return Some(stripped);
1800    }
1801    if let Some(stripped) = link
1802        .strip_prefix(ZED_URL_SCHEME)
1803        .and_then(|result| result.strip_prefix("://"))
1804    {
1805        return Some(stripped);
1806    }
1807
1808    None
1809}
1810
1811#[cfg(test)]
1812mod tests {
1813    use super::*;
1814    use crate::test::FakeServer;
1815
1816    use clock::FakeSystemClock;
1817    use gpui::{BackgroundExecutor, Context, TestAppContext};
1818    use http_client::FakeHttpClient;
1819    use parking_lot::Mutex;
1820    use proto::TypedEnvelope;
1821    use settings::SettingsStore;
1822    use std::future;
1823
1824    #[gpui::test(iterations = 10)]
1825    async fn test_reconnection(cx: &mut TestAppContext) {
1826        init_test(cx);
1827        let user_id = 5;
1828        let client = cx.update(|cx| {
1829            Client::new(
1830                Arc::new(FakeSystemClock::default()),
1831                FakeHttpClient::with_404_response(),
1832                cx,
1833            )
1834        });
1835        let server = FakeServer::for_client(user_id, &client, cx).await;
1836        let mut status = client.status();
1837        assert!(matches!(
1838            status.next().await,
1839            Some(Status::Connected { .. })
1840        ));
1841        assert_eq!(server.auth_count(), 1);
1842
1843        server.forbid_connections();
1844        server.disconnect();
1845        while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
1846
1847        server.allow_connections();
1848        cx.executor().advance_clock(Duration::from_secs(10));
1849        while !matches!(status.next().await, Some(Status::Connected { .. })) {}
1850        assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting
1851
1852        server.forbid_connections();
1853        server.disconnect();
1854        while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
1855
1856        // Clear cached credentials after authentication fails
1857        server.roll_access_token();
1858        server.allow_connections();
1859        cx.executor().run_until_parked();
1860        cx.executor().advance_clock(Duration::from_secs(10));
1861        while !matches!(status.next().await, Some(Status::Connected { .. })) {}
1862        assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token
1863    }
1864
1865    #[gpui::test(iterations = 10)]
1866    async fn test_connection_timeout(executor: BackgroundExecutor, cx: &mut TestAppContext) {
1867        init_test(cx);
1868        let user_id = 5;
1869        let client = cx.update(|cx| {
1870            Client::new(
1871                Arc::new(FakeSystemClock::default()),
1872                FakeHttpClient::with_404_response(),
1873                cx,
1874            )
1875        });
1876        let mut status = client.status();
1877
1878        // Time out when client tries to connect.
1879        client.override_authenticate(move |cx| {
1880            cx.background_executor().spawn(async move {
1881                Ok(Credentials::User {
1882                    user_id,
1883                    access_token: "token".into(),
1884                })
1885            })
1886        });
1887        client.override_establish_connection(|_, cx| {
1888            cx.background_executor().spawn(async move {
1889                future::pending::<()>().await;
1890                unreachable!()
1891            })
1892        });
1893        let auth_and_connect = cx.spawn({
1894            let client = client.clone();
1895            |cx| async move { client.authenticate_and_connect(false, &cx).await }
1896        });
1897        executor.run_until_parked();
1898        assert!(matches!(status.next().await, Some(Status::Connecting)));
1899
1900        executor.advance_clock(CONNECTION_TIMEOUT);
1901        assert!(matches!(
1902            status.next().await,
1903            Some(Status::ConnectionError { .. })
1904        ));
1905        auth_and_connect.await.unwrap_err();
1906
1907        // Allow the connection to be established.
1908        let server = FakeServer::for_client(user_id, &client, cx).await;
1909        assert!(matches!(
1910            status.next().await,
1911            Some(Status::Connected { .. })
1912        ));
1913
1914        // Disconnect client.
1915        server.forbid_connections();
1916        server.disconnect();
1917        while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {}
1918
1919        // Time out when re-establishing the connection.
1920        server.allow_connections();
1921        client.override_establish_connection(|_, cx| {
1922            cx.background_executor().spawn(async move {
1923                future::pending::<()>().await;
1924                unreachable!()
1925            })
1926        });
1927        executor.advance_clock(2 * INITIAL_RECONNECTION_DELAY);
1928        assert!(matches!(
1929            status.next().await,
1930            Some(Status::Reconnecting { .. })
1931        ));
1932
1933        executor.advance_clock(CONNECTION_TIMEOUT);
1934        assert!(matches!(
1935            status.next().await,
1936            Some(Status::ReconnectionError { .. })
1937        ));
1938    }
1939
1940    #[gpui::test(iterations = 10)]
1941    async fn test_authenticating_more_than_once(
1942        cx: &mut TestAppContext,
1943        executor: BackgroundExecutor,
1944    ) {
1945        init_test(cx);
1946        let auth_count = Arc::new(Mutex::new(0));
1947        let dropped_auth_count = Arc::new(Mutex::new(0));
1948        let client = cx.update(|cx| {
1949            Client::new(
1950                Arc::new(FakeSystemClock::default()),
1951                FakeHttpClient::with_404_response(),
1952                cx,
1953            )
1954        });
1955        client.override_authenticate({
1956            let auth_count = auth_count.clone();
1957            let dropped_auth_count = dropped_auth_count.clone();
1958            move |cx| {
1959                let auth_count = auth_count.clone();
1960                let dropped_auth_count = dropped_auth_count.clone();
1961                cx.background_executor().spawn(async move {
1962                    *auth_count.lock() += 1;
1963                    let _drop = util::defer(move || *dropped_auth_count.lock() += 1);
1964                    future::pending::<()>().await;
1965                    unreachable!()
1966                })
1967            }
1968        });
1969
1970        let _authenticate = cx.spawn({
1971            let client = client.clone();
1972            move |cx| async move { client.authenticate_and_connect(false, &cx).await }
1973        });
1974        executor.run_until_parked();
1975        assert_eq!(*auth_count.lock(), 1);
1976        assert_eq!(*dropped_auth_count.lock(), 0);
1977
1978        let _authenticate = cx.spawn({
1979            let client = client.clone();
1980            |cx| async move { client.authenticate_and_connect(false, &cx).await }
1981        });
1982        executor.run_until_parked();
1983        assert_eq!(*auth_count.lock(), 2);
1984        assert_eq!(*dropped_auth_count.lock(), 1);
1985    }
1986
1987    #[gpui::test]
1988    async fn test_subscribing_to_entity(cx: &mut TestAppContext) {
1989        init_test(cx);
1990        let user_id = 5;
1991        let client = cx.update(|cx| {
1992            Client::new(
1993                Arc::new(FakeSystemClock::default()),
1994                FakeHttpClient::with_404_response(),
1995                cx,
1996            )
1997        });
1998        let server = FakeServer::for_client(user_id, &client, cx).await;
1999
2000        let (done_tx1, mut done_rx1) = smol::channel::unbounded();
2001        let (done_tx2, mut done_rx2) = smol::channel::unbounded();
2002        client.add_model_message_handler(
2003            move |model: Model<TestModel>, _: TypedEnvelope<proto::JoinProject>, mut cx| {
2004                match model.update(&mut cx, |model, _| model.id).unwrap() {
2005                    1 => done_tx1.try_send(()).unwrap(),
2006                    2 => done_tx2.try_send(()).unwrap(),
2007                    _ => unreachable!(),
2008                }
2009                async { Ok(()) }
2010            },
2011        );
2012        let model1 = cx.new_model(|_| TestModel {
2013            id: 1,
2014            subscription: None,
2015        });
2016        let model2 = cx.new_model(|_| TestModel {
2017            id: 2,
2018            subscription: None,
2019        });
2020        let model3 = cx.new_model(|_| TestModel {
2021            id: 3,
2022            subscription: None,
2023        });
2024
2025        let _subscription1 = client
2026            .subscribe_to_entity(1)
2027            .unwrap()
2028            .set_model(&model1, &mut cx.to_async());
2029        let _subscription2 = client
2030            .subscribe_to_entity(2)
2031            .unwrap()
2032            .set_model(&model2, &mut cx.to_async());
2033        // Ensure dropping a subscription for the same entity type still allows receiving of
2034        // messages for other entity IDs of the same type.
2035        let subscription3 = client
2036            .subscribe_to_entity(3)
2037            .unwrap()
2038            .set_model(&model3, &mut cx.to_async());
2039        drop(subscription3);
2040
2041        server.send(proto::JoinProject { project_id: 1 });
2042        server.send(proto::JoinProject { project_id: 2 });
2043        done_rx1.next().await.unwrap();
2044        done_rx2.next().await.unwrap();
2045    }
2046
2047    #[gpui::test]
2048    async fn test_subscribing_after_dropping_subscription(cx: &mut TestAppContext) {
2049        init_test(cx);
2050        let user_id = 5;
2051        let client = cx.update(|cx| {
2052            Client::new(
2053                Arc::new(FakeSystemClock::default()),
2054                FakeHttpClient::with_404_response(),
2055                cx,
2056            )
2057        });
2058        let server = FakeServer::for_client(user_id, &client, cx).await;
2059
2060        let model = cx.new_model(|_| TestModel::default());
2061        let (done_tx1, _done_rx1) = smol::channel::unbounded();
2062        let (done_tx2, mut done_rx2) = smol::channel::unbounded();
2063        let subscription1 = client.add_message_handler(
2064            model.downgrade(),
2065            move |_, _: TypedEnvelope<proto::Ping>, _| {
2066                done_tx1.try_send(()).unwrap();
2067                async { Ok(()) }
2068            },
2069        );
2070        drop(subscription1);
2071        let _subscription2 = client.add_message_handler(
2072            model.downgrade(),
2073            move |_, _: TypedEnvelope<proto::Ping>, _| {
2074                done_tx2.try_send(()).unwrap();
2075                async { Ok(()) }
2076            },
2077        );
2078        server.send(proto::Ping {});
2079        done_rx2.next().await.unwrap();
2080    }
2081
2082    #[gpui::test]
2083    async fn test_dropping_subscription_in_handler(cx: &mut TestAppContext) {
2084        init_test(cx);
2085        let user_id = 5;
2086        let client = cx.update(|cx| {
2087            Client::new(
2088                Arc::new(FakeSystemClock::default()),
2089                FakeHttpClient::with_404_response(),
2090                cx,
2091            )
2092        });
2093        let server = FakeServer::for_client(user_id, &client, cx).await;
2094
2095        let model = cx.new_model(|_| TestModel::default());
2096        let (done_tx, mut done_rx) = smol::channel::unbounded();
2097        let subscription = client.add_message_handler(
2098            model.clone().downgrade(),
2099            move |model: Model<TestModel>, _: TypedEnvelope<proto::Ping>, mut cx| {
2100                model
2101                    .update(&mut cx, |model, _| model.subscription.take())
2102                    .unwrap();
2103                done_tx.try_send(()).unwrap();
2104                async { Ok(()) }
2105            },
2106        );
2107        model.update(cx, |model, _| {
2108            model.subscription = Some(subscription);
2109        });
2110        server.send(proto::Ping {});
2111        done_rx.next().await.unwrap();
2112    }
2113
2114    #[derive(Default)]
2115    struct TestModel {
2116        id: usize,
2117        subscription: Option<Subscription>,
2118    }
2119
2120    fn init_test(cx: &mut TestAppContext) {
2121        cx.update(|cx| {
2122            let settings_store = SettingsStore::test(cx);
2123            cx.set_global(settings_store);
2124            init_settings(cx);
2125        });
2126    }
2127}