remote_client.rs

   1use crate::{
   2    SshConnectionOptions,
   3    protocol::MessageId,
   4    proxy::ProxyLaunchError,
   5    transport::{
   6        ssh::SshRemoteConnection,
   7        wsl::{WslConnectionOptions, WslRemoteConnection},
   8    },
   9};
  10use anyhow::{Context as _, Result, anyhow};
  11use askpass::EncryptedPassword;
  12use async_trait::async_trait;
  13use collections::HashMap;
  14use futures::{
  15    Future, FutureExt as _, StreamExt as _,
  16    channel::{
  17        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  18        oneshot,
  19    },
  20    future::{BoxFuture, Shared},
  21    select, select_biased,
  22};
  23use gpui::{
  24    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  25    EventEmitter, Global, SemanticVersion, Task, WeakEntity,
  26};
  27use parking_lot::Mutex;
  28
  29use release_channel::ReleaseChannel;
  30use rpc::{
  31    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  32    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  33};
  34use std::{
  35    collections::VecDeque,
  36    fmt,
  37    ops::ControlFlow,
  38    path::PathBuf,
  39    sync::{
  40        Arc, Weak,
  41        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  42    },
  43    time::{Duration, Instant},
  44};
  45use util::{
  46    ResultExt,
  47    paths::{PathStyle, RemotePathBuf},
  48};
  49
  50#[derive(Copy, Clone, Debug)]
  51pub struct RemotePlatform {
  52    pub os: &'static str,
  53    pub arch: &'static str,
  54}
  55
  56#[derive(Clone, Debug)]
  57pub struct CommandTemplate {
  58    pub program: String,
  59    pub args: Vec<String>,
  60    pub env: HashMap<String, String>,
  61}
  62
  63pub trait RemoteClientDelegate: Send + Sync {
  64    fn ask_password(
  65        &self,
  66        prompt: String,
  67        tx: oneshot::Sender<EncryptedPassword>,
  68        cx: &mut AsyncApp,
  69    );
  70    fn get_download_params(
  71        &self,
  72        platform: RemotePlatform,
  73        release_channel: ReleaseChannel,
  74        version: Option<SemanticVersion>,
  75        cx: &mut AsyncApp,
  76    ) -> Task<Result<Option<(String, String)>>>;
  77    fn download_server_binary_locally(
  78        &self,
  79        platform: RemotePlatform,
  80        release_channel: ReleaseChannel,
  81        version: Option<SemanticVersion>,
  82        cx: &mut AsyncApp,
  83    ) -> Task<Result<PathBuf>>;
  84    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
  85}
  86
  87const MAX_MISSED_HEARTBEATS: usize = 5;
  88const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
  89const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
  90
  91const MAX_RECONNECT_ATTEMPTS: usize = 3;
  92
  93enum State {
  94    Connecting,
  95    Connected {
  96        ssh_connection: Arc<dyn RemoteConnection>,
  97        delegate: Arc<dyn RemoteClientDelegate>,
  98
  99        multiplex_task: Task<Result<()>>,
 100        heartbeat_task: Task<Result<()>>,
 101    },
 102    HeartbeatMissed {
 103        missed_heartbeats: usize,
 104
 105        ssh_connection: Arc<dyn RemoteConnection>,
 106        delegate: Arc<dyn RemoteClientDelegate>,
 107
 108        multiplex_task: Task<Result<()>>,
 109        heartbeat_task: Task<Result<()>>,
 110    },
 111    Reconnecting,
 112    ReconnectFailed {
 113        ssh_connection: Arc<dyn RemoteConnection>,
 114        delegate: Arc<dyn RemoteClientDelegate>,
 115
 116        error: anyhow::Error,
 117        attempts: usize,
 118    },
 119    ReconnectExhausted,
 120    ServerNotRunning,
 121}
 122
 123impl fmt::Display for State {
 124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 125        match self {
 126            Self::Connecting => write!(f, "connecting"),
 127            Self::Connected { .. } => write!(f, "connected"),
 128            Self::Reconnecting => write!(f, "reconnecting"),
 129            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 130            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 131            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 132            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 133        }
 134    }
 135}
 136
 137impl State {
 138    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 139        match self {
 140            Self::Connected { ssh_connection, .. } => Some(ssh_connection.clone()),
 141            Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.clone()),
 142            Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.clone()),
 143            _ => None,
 144        }
 145    }
 146
 147    fn can_reconnect(&self) -> bool {
 148        match self {
 149            Self::Connected { .. }
 150            | Self::HeartbeatMissed { .. }
 151            | Self::ReconnectFailed { .. } => true,
 152            State::Connecting
 153            | State::Reconnecting
 154            | State::ReconnectExhausted
 155            | State::ServerNotRunning => false,
 156        }
 157    }
 158
 159    fn is_reconnect_failed(&self) -> bool {
 160        matches!(self, Self::ReconnectFailed { .. })
 161    }
 162
 163    fn is_reconnect_exhausted(&self) -> bool {
 164        matches!(self, Self::ReconnectExhausted { .. })
 165    }
 166
 167    fn is_server_not_running(&self) -> bool {
 168        matches!(self, Self::ServerNotRunning)
 169    }
 170
 171    fn is_reconnecting(&self) -> bool {
 172        matches!(self, Self::Reconnecting { .. })
 173    }
 174
 175    fn heartbeat_recovered(self) -> Self {
 176        match self {
 177            Self::HeartbeatMissed {
 178                ssh_connection,
 179                delegate,
 180                multiplex_task,
 181                heartbeat_task,
 182                ..
 183            } => Self::Connected {
 184                ssh_connection,
 185                delegate,
 186                multiplex_task,
 187                heartbeat_task,
 188            },
 189            _ => self,
 190        }
 191    }
 192
 193    fn heartbeat_missed(self) -> Self {
 194        match self {
 195            Self::Connected {
 196                ssh_connection,
 197                delegate,
 198                multiplex_task,
 199                heartbeat_task,
 200            } => Self::HeartbeatMissed {
 201                missed_heartbeats: 1,
 202                ssh_connection,
 203                delegate,
 204                multiplex_task,
 205                heartbeat_task,
 206            },
 207            Self::HeartbeatMissed {
 208                missed_heartbeats,
 209                ssh_connection,
 210                delegate,
 211                multiplex_task,
 212                heartbeat_task,
 213            } => Self::HeartbeatMissed {
 214                missed_heartbeats: missed_heartbeats + 1,
 215                ssh_connection,
 216                delegate,
 217                multiplex_task,
 218                heartbeat_task,
 219            },
 220            _ => self,
 221        }
 222    }
 223}
 224
 225/// The state of the ssh connection.
 226#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 227pub enum ConnectionState {
 228    Connecting,
 229    Connected,
 230    HeartbeatMissed,
 231    Reconnecting,
 232    Disconnected,
 233}
 234
 235impl From<&State> for ConnectionState {
 236    fn from(value: &State) -> Self {
 237        match value {
 238            State::Connecting => Self::Connecting,
 239            State::Connected { .. } => Self::Connected,
 240            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 241            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 242            State::ReconnectExhausted => Self::Disconnected,
 243            State::ServerNotRunning => Self::Disconnected,
 244        }
 245    }
 246}
 247
 248pub struct RemoteClient {
 249    client: Arc<ChannelClient>,
 250    unique_identifier: String,
 251    connection_options: RemoteConnectionOptions,
 252    path_style: PathStyle,
 253    state: Option<State>,
 254}
 255
 256#[derive(Debug)]
 257pub enum RemoteClientEvent {
 258    Disconnected,
 259}
 260
 261impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 262
 263// Identifies the socket on the remote server so that reconnects
 264// can re-join the same project.
 265pub enum ConnectionIdentifier {
 266    Setup(u64),
 267    Workspace(i64),
 268}
 269
 270static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 271
 272impl ConnectionIdentifier {
 273    pub fn setup() -> Self {
 274        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 275    }
 276
 277    // This string gets used in a socket name, and so must be relatively short.
 278    // The total length of:
 279    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 280    // Must be less than about 100 characters
 281    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 282    // So our strings should be at most 20 characters or so.
 283    fn to_string(&self, cx: &App) -> String {
 284        let identifier_prefix = match ReleaseChannel::global(cx) {
 285            ReleaseChannel::Stable => "".to_string(),
 286            release_channel => format!("{}-", release_channel.dev_name()),
 287        };
 288        match self {
 289            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 290            Self::Workspace(workspace_id) => {
 291                format!("{identifier_prefix}workspace-{workspace_id}",)
 292            }
 293        }
 294    }
 295}
 296
 297impl RemoteClient {
 298    pub fn ssh(
 299        unique_identifier: ConnectionIdentifier,
 300        connection_options: SshConnectionOptions,
 301        cancellation: oneshot::Receiver<()>,
 302        delegate: Arc<dyn RemoteClientDelegate>,
 303        cx: &mut App,
 304    ) -> Task<Result<Option<Entity<Self>>>> {
 305        Self::new(
 306            unique_identifier,
 307            RemoteConnectionOptions::Ssh(connection_options),
 308            cancellation,
 309            delegate,
 310            cx,
 311        )
 312    }
 313
 314    pub fn new(
 315        unique_identifier: ConnectionIdentifier,
 316        connection_options: RemoteConnectionOptions,
 317        cancellation: oneshot::Receiver<()>,
 318        delegate: Arc<dyn RemoteClientDelegate>,
 319        cx: &mut App,
 320    ) -> Task<Result<Option<Entity<Self>>>> {
 321        let unique_identifier = unique_identifier.to_string(cx);
 322        cx.spawn(async move |cx| {
 323            let success = Box::pin(async move {
 324                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 325                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 326                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 327
 328                let client =
 329                    cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "client"))?;
 330
 331                let ssh_connection = cx
 332                    .update(|cx| {
 333                        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 334                            pool.connect(connection_options.clone(), &delegate, cx)
 335                        })
 336                    })?
 337                    .await
 338                    .map_err(|e| e.cloned())?;
 339
 340                let path_style = ssh_connection.path_style();
 341                let this = cx.new(|_| Self {
 342                    client: client.clone(),
 343                    unique_identifier: unique_identifier.clone(),
 344                    connection_options,
 345                    path_style,
 346                    state: Some(State::Connecting),
 347                })?;
 348
 349                let io_task = ssh_connection.start_proxy(
 350                    unique_identifier,
 351                    false,
 352                    incoming_tx,
 353                    outgoing_rx,
 354                    connection_activity_tx,
 355                    delegate.clone(),
 356                    cx,
 357                );
 358
 359                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 360
 361                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 362                    log::error!("failed to establish connection: {}", error);
 363                    return Err(error);
 364                }
 365
 366                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 367
 368                this.update(cx, |this, _| {
 369                    this.state = Some(State::Connected {
 370                        ssh_connection,
 371                        delegate,
 372                        multiplex_task,
 373                        heartbeat_task,
 374                    });
 375                })?;
 376
 377                Ok(Some(this))
 378            });
 379
 380            select! {
 381                _ = cancellation.fuse() => {
 382                    Ok(None)
 383                }
 384                result = success.fuse() =>  result
 385            }
 386        })
 387    }
 388
 389    pub fn proto_client_from_channels(
 390        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 391        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 392        cx: &App,
 393        name: &'static str,
 394    ) -> AnyProtoClient {
 395        ChannelClient::new(incoming_rx, outgoing_tx, cx, name).into()
 396    }
 397
 398    pub fn shutdown_processes<T: RequestMessage>(
 399        &mut self,
 400        shutdown_request: Option<T>,
 401        executor: BackgroundExecutor,
 402    ) -> Option<impl Future<Output = ()> + use<T>> {
 403        let state = self.state.take()?;
 404        log::info!("shutting down ssh processes");
 405
 406        let State::Connected {
 407            multiplex_task,
 408            heartbeat_task,
 409            ssh_connection,
 410            delegate,
 411        } = state
 412        else {
 413            return None;
 414        };
 415
 416        let client = self.client.clone();
 417
 418        Some(async move {
 419            if let Some(shutdown_request) = shutdown_request {
 420                client.send(shutdown_request).log_err();
 421                // We wait 50ms instead of waiting for a response, because
 422                // waiting for a response would require us to wait on the main thread
 423                // which we want to avoid in an `on_app_quit` callback.
 424                executor.timer(Duration::from_millis(50)).await;
 425            }
 426
 427            // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
 428            // child of master_process.
 429            drop(multiplex_task);
 430            // Now drop the rest of state, which kills master process.
 431            drop(heartbeat_task);
 432            drop(ssh_connection);
 433            drop(delegate);
 434        })
 435    }
 436
 437    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 438        let can_reconnect = self
 439            .state
 440            .as_ref()
 441            .map(|state| state.can_reconnect())
 442            .unwrap_or(false);
 443        if !can_reconnect {
 444            log::info!("aborting reconnect, because not in state that allows reconnecting");
 445            let error = if let Some(state) = self.state.as_ref() {
 446                format!("invalid state, cannot reconnect while in state {state}")
 447            } else {
 448                "no state set".to_string()
 449            };
 450            anyhow::bail!(error);
 451        }
 452
 453        let state = self.state.take().unwrap();
 454        let (attempts, remote_connection, delegate) = match state {
 455            State::Connected {
 456                ssh_connection,
 457                delegate,
 458                multiplex_task,
 459                heartbeat_task,
 460            }
 461            | State::HeartbeatMissed {
 462                ssh_connection,
 463                delegate,
 464                multiplex_task,
 465                heartbeat_task,
 466                ..
 467            } => {
 468                drop(multiplex_task);
 469                drop(heartbeat_task);
 470                (0, ssh_connection, delegate)
 471            }
 472            State::ReconnectFailed {
 473                attempts,
 474                ssh_connection,
 475                delegate,
 476                ..
 477            } => (attempts, ssh_connection, delegate),
 478            State::Connecting
 479            | State::Reconnecting
 480            | State::ReconnectExhausted
 481            | State::ServerNotRunning => unreachable!(),
 482        };
 483
 484        let attempts = attempts + 1;
 485        if attempts > MAX_RECONNECT_ATTEMPTS {
 486            log::error!(
 487                "Failed to reconnect to after {} attempts, giving up",
 488                MAX_RECONNECT_ATTEMPTS
 489            );
 490            self.set_state(State::ReconnectExhausted, cx);
 491            return Ok(());
 492        }
 493
 494        self.set_state(State::Reconnecting, cx);
 495
 496        log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
 497
 498        let unique_identifier = self.unique_identifier.clone();
 499        let client = self.client.clone();
 500        let reconnect_task = cx.spawn(async move |this, cx| {
 501            macro_rules! failed {
 502                ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr) => {
 503                    return State::ReconnectFailed {
 504                        error: anyhow!($error),
 505                        attempts: $attempts,
 506                        ssh_connection: $ssh_connection,
 507                        delegate: $delegate,
 508                    };
 509                };
 510            }
 511
 512            if let Err(error) = remote_connection
 513                .kill()
 514                .await
 515                .context("Failed to kill ssh process")
 516            {
 517                failed!(error, attempts, remote_connection, delegate);
 518            };
 519
 520            let connection_options = remote_connection.connection_options();
 521
 522            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 523            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 524            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 525
 526            let (ssh_connection, io_task) = match async {
 527                let ssh_connection = cx
 528                    .update_global(|pool: &mut ConnectionPool, cx| {
 529                        pool.connect(connection_options, &delegate, cx)
 530                    })?
 531                    .await
 532                    .map_err(|error| error.cloned())?;
 533
 534                let io_task = ssh_connection.start_proxy(
 535                    unique_identifier,
 536                    true,
 537                    incoming_tx,
 538                    outgoing_rx,
 539                    connection_activity_tx,
 540                    delegate.clone(),
 541                    cx,
 542                );
 543                anyhow::Ok((ssh_connection, io_task))
 544            }
 545            .await
 546            {
 547                Ok((ssh_connection, io_task)) => (ssh_connection, io_task),
 548                Err(error) => {
 549                    failed!(error, attempts, remote_connection, delegate);
 550                }
 551            };
 552
 553            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 554            client.reconnect(incoming_rx, outgoing_tx, cx);
 555
 556            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 557                failed!(error, attempts, ssh_connection, delegate);
 558            };
 559
 560            State::Connected {
 561                ssh_connection,
 562                delegate,
 563                multiplex_task,
 564                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 565            }
 566        });
 567
 568        cx.spawn(async move |this, cx| {
 569            let new_state = reconnect_task.await;
 570            this.update(cx, |this, cx| {
 571                this.try_set_state(cx, |old_state| {
 572                    if old_state.is_reconnecting() {
 573                        match &new_state {
 574                            State::Connecting
 575                            | State::Reconnecting
 576                            | State::HeartbeatMissed { .. }
 577                            | State::ServerNotRunning => {}
 578                            State::Connected { .. } => {
 579                                log::info!("Successfully reconnected");
 580                            }
 581                            State::ReconnectFailed {
 582                                error, attempts, ..
 583                            } => {
 584                                log::error!(
 585                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 586                                    attempts,
 587                                    error
 588                                );
 589                            }
 590                            State::ReconnectExhausted => {
 591                                log::error!("Reconnect attempt failed and all attempts exhausted");
 592                            }
 593                        }
 594                        Some(new_state)
 595                    } else {
 596                        None
 597                    }
 598                });
 599
 600                if this.state_is(State::is_reconnect_failed) {
 601                    this.reconnect(cx)
 602                } else if this.state_is(State::is_reconnect_exhausted) {
 603                    Ok(())
 604                } else {
 605                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 606                    Ok(())
 607                }
 608            })
 609        })
 610        .detach_and_log_err(cx);
 611
 612        Ok(())
 613    }
 614
 615    fn heartbeat(
 616        this: WeakEntity<Self>,
 617        mut connection_activity_rx: mpsc::Receiver<()>,
 618        cx: &mut AsyncApp,
 619    ) -> Task<Result<()>> {
 620        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 621            return Task::ready(Err(anyhow!("SshRemoteClient lost")));
 622        };
 623
 624        cx.spawn(async move |cx| {
 625            let mut missed_heartbeats = 0;
 626
 627            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 628            futures::pin_mut!(keepalive_timer);
 629
 630            loop {
 631                select_biased! {
 632                    result = connection_activity_rx.next().fuse() => {
 633                        if result.is_none() {
 634                            log::warn!("ssh heartbeat: connection activity channel has been dropped. stopping.");
 635                            return Ok(());
 636                        }
 637
 638                        if missed_heartbeats != 0 {
 639                            missed_heartbeats = 0;
 640                            let _ =this.update(cx, |this, cx| {
 641                                this.handle_heartbeat_result(missed_heartbeats, cx)
 642                            })?;
 643                        }
 644                    }
 645                    _ = keepalive_timer => {
 646                        log::debug!("Sending heartbeat to server...");
 647
 648                        let result = select_biased! {
 649                            _ = connection_activity_rx.next().fuse() => {
 650                                Ok(())
 651                            }
 652                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 653                                ping_result
 654                            }
 655                        };
 656
 657                        if result.is_err() {
 658                            missed_heartbeats += 1;
 659                            log::warn!(
 660                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 661                                HEARTBEAT_TIMEOUT,
 662                                missed_heartbeats,
 663                                MAX_MISSED_HEARTBEATS
 664                            );
 665                        } else if missed_heartbeats != 0 {
 666                            missed_heartbeats = 0;
 667                        } else {
 668                            continue;
 669                        }
 670
 671                        let result = this.update(cx, |this, cx| {
 672                            this.handle_heartbeat_result(missed_heartbeats, cx)
 673                        })?;
 674                        if result.is_break() {
 675                            return Ok(());
 676                        }
 677                    }
 678                }
 679
 680                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 681            }
 682        })
 683    }
 684
 685    fn handle_heartbeat_result(
 686        &mut self,
 687        missed_heartbeats: usize,
 688        cx: &mut Context<Self>,
 689    ) -> ControlFlow<()> {
 690        let state = self.state.take().unwrap();
 691        let next_state = if missed_heartbeats > 0 {
 692            state.heartbeat_missed()
 693        } else {
 694            state.heartbeat_recovered()
 695        };
 696
 697        self.set_state(next_state, cx);
 698
 699        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 700            log::error!(
 701                "Missed last {} heartbeats. Reconnecting...",
 702                missed_heartbeats
 703            );
 704
 705            self.reconnect(cx)
 706                .context("failed to start reconnect process after missing heartbeats")
 707                .log_err();
 708            ControlFlow::Break(())
 709        } else {
 710            ControlFlow::Continue(())
 711        }
 712    }
 713
 714    fn monitor(
 715        this: WeakEntity<Self>,
 716        io_task: Task<Result<i32>>,
 717        cx: &AsyncApp,
 718    ) -> Task<Result<()>> {
 719        cx.spawn(async move |cx| {
 720            let result = io_task.await;
 721
 722            match result {
 723                Ok(exit_code) => {
 724                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 725                        match error {
 726                            ProxyLaunchError::ServerNotRunning => {
 727                                log::error!("failed to reconnect because server is not running");
 728                                this.update(cx, |this, cx| {
 729                                    this.set_state(State::ServerNotRunning, cx);
 730                                })?;
 731                            }
 732                        }
 733                    } else if exit_code > 0 {
 734                        log::error!("proxy process terminated unexpectedly");
 735                        this.update(cx, |this, cx| {
 736                            this.reconnect(cx).ok();
 737                        })?;
 738                    }
 739                }
 740                Err(error) => {
 741                    log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
 742                    this.update(cx, |this, cx| {
 743                        this.reconnect(cx).ok();
 744                    })?;
 745                }
 746            }
 747
 748            Ok(())
 749        })
 750    }
 751
 752    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 753        self.state.as_ref().is_some_and(check)
 754    }
 755
 756    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 757        let new_state = self.state.as_ref().and_then(map);
 758        if let Some(new_state) = new_state {
 759            self.state.replace(new_state);
 760            cx.notify();
 761        }
 762    }
 763
 764    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 765        log::info!("setting state to '{}'", &state);
 766
 767        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 768        let is_server_not_running = state.is_server_not_running();
 769        self.state.replace(state);
 770
 771        if is_reconnect_exhausted || is_server_not_running {
 772            cx.emit(RemoteClientEvent::Disconnected);
 773        }
 774        cx.notify();
 775    }
 776
 777    pub fn shell(&self) -> Option<String> {
 778        Some(self.remote_connection()?.shell())
 779    }
 780
 781    pub fn default_system_shell(&self) -> Option<String> {
 782        Some(self.remote_connection()?.default_system_shell())
 783    }
 784
 785    pub fn shares_network_interface(&self) -> bool {
 786        self.remote_connection()
 787            .map_or(false, |connection| connection.shares_network_interface())
 788    }
 789
 790    pub fn build_command(
 791        &self,
 792        program: Option<String>,
 793        args: &[String],
 794        env: &HashMap<String, String>,
 795        working_dir: Option<String>,
 796        port_forward: Option<(u16, String, u16)>,
 797    ) -> Result<CommandTemplate> {
 798        let Some(connection) = self.remote_connection() else {
 799            return Err(anyhow!("no ssh connection"));
 800        };
 801        connection.build_command(program, args, env, working_dir, port_forward)
 802    }
 803
 804    pub fn upload_directory(
 805        &self,
 806        src_path: PathBuf,
 807        dest_path: RemotePathBuf,
 808        cx: &App,
 809    ) -> Task<Result<()>> {
 810        let Some(connection) = self.remote_connection() else {
 811            return Task::ready(Err(anyhow!("no ssh connection")));
 812        };
 813        connection.upload_directory(src_path, dest_path, cx)
 814    }
 815
 816    pub fn proto_client(&self) -> AnyProtoClient {
 817        self.client.clone().into()
 818    }
 819
 820    pub fn connection_options(&self) -> RemoteConnectionOptions {
 821        self.connection_options.clone()
 822    }
 823
 824    pub fn connection_state(&self) -> ConnectionState {
 825        self.state
 826            .as_ref()
 827            .map(ConnectionState::from)
 828            .unwrap_or(ConnectionState::Disconnected)
 829    }
 830
 831    pub fn is_disconnected(&self) -> bool {
 832        self.connection_state() == ConnectionState::Disconnected
 833    }
 834
 835    pub fn path_style(&self) -> PathStyle {
 836        self.path_style
 837    }
 838
 839    #[cfg(any(test, feature = "test-support"))]
 840    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
 841        let opts = self.connection_options();
 842        client_cx.spawn(async move |cx| {
 843            let connection = cx
 844                .update_global(|c: &mut ConnectionPool, _| {
 845                    if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
 846                        c.clone()
 847                    } else {
 848                        panic!("missing test connection")
 849                    }
 850                })
 851                .unwrap()
 852                .await
 853                .unwrap();
 854
 855            connection.simulate_disconnect(cx);
 856        })
 857    }
 858
 859    #[cfg(any(test, feature = "test-support"))]
 860    pub fn fake_server(
 861        client_cx: &mut gpui::TestAppContext,
 862        server_cx: &mut gpui::TestAppContext,
 863    ) -> (RemoteConnectionOptions, AnyProtoClient) {
 864        let port = client_cx
 865            .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
 866        let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
 867            host: "<fake>".to_string(),
 868            port: Some(port),
 869            ..Default::default()
 870        });
 871        let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
 872        let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
 873        let server_client =
 874            server_cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server"));
 875        let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
 876            connection_options: opts.clone(),
 877            server_cx: fake::SendableCx::new(server_cx),
 878            server_channel: server_client.clone(),
 879        });
 880
 881        client_cx.update(|cx| {
 882            cx.update_default_global(|c: &mut ConnectionPool, cx| {
 883                c.connections.insert(
 884                    opts.clone(),
 885                    ConnectionPoolEntry::Connecting(
 886                        cx.background_spawn({
 887                            let connection = connection.clone();
 888                            async move { Ok(connection.clone()) }
 889                        })
 890                        .shared(),
 891                    ),
 892                );
 893            })
 894        });
 895
 896        (opts, server_client.into())
 897    }
 898
 899    #[cfg(any(test, feature = "test-support"))]
 900    pub async fn fake_client(
 901        opts: RemoteConnectionOptions,
 902        client_cx: &mut gpui::TestAppContext,
 903    ) -> Entity<Self> {
 904        let (_tx, rx) = oneshot::channel();
 905        client_cx
 906            .update(|cx| {
 907                Self::new(
 908                    ConnectionIdentifier::setup(),
 909                    opts,
 910                    rx,
 911                    Arc::new(fake::Delegate),
 912                    cx,
 913                )
 914            })
 915            .await
 916            .unwrap()
 917            .unwrap()
 918    }
 919
 920    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 921        self.state
 922            .as_ref()
 923            .and_then(|state| state.remote_connection())
 924    }
 925}
 926
 927enum ConnectionPoolEntry {
 928    Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
 929    Connected(Weak<dyn RemoteConnection>),
 930}
 931
 932#[derive(Default)]
 933struct ConnectionPool {
 934    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
 935}
 936
 937impl Global for ConnectionPool {}
 938
 939impl ConnectionPool {
 940    pub fn connect(
 941        &mut self,
 942        opts: RemoteConnectionOptions,
 943        delegate: &Arc<dyn RemoteClientDelegate>,
 944        cx: &mut App,
 945    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
 946        let connection = self.connections.get(&opts);
 947        match connection {
 948            Some(ConnectionPoolEntry::Connecting(task)) => {
 949                let delegate = delegate.clone();
 950                cx.spawn(async move |cx| {
 951                    delegate.set_status(Some("Waiting for existing connection attempt"), cx);
 952                })
 953                .detach();
 954                return task.clone();
 955            }
 956            Some(ConnectionPoolEntry::Connected(ssh)) => {
 957                if let Some(ssh) = ssh.upgrade()
 958                    && !ssh.has_been_killed()
 959                {
 960                    return Task::ready(Ok(ssh)).shared();
 961                }
 962                self.connections.remove(&opts);
 963            }
 964            None => {}
 965        }
 966
 967        let task = cx
 968            .spawn({
 969                let opts = opts.clone();
 970                let delegate = delegate.clone();
 971                async move |cx| {
 972                    let connection = match opts.clone() {
 973                        RemoteConnectionOptions::Ssh(opts) => {
 974                            SshRemoteConnection::new(opts, delegate, cx)
 975                                .await
 976                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
 977                        }
 978                        RemoteConnectionOptions::Wsl(opts) => {
 979                            WslRemoteConnection::new(opts, delegate, cx)
 980                                .await
 981                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
 982                        }
 983                    };
 984
 985                    cx.update_global(|pool: &mut Self, _| {
 986                        debug_assert!(matches!(
 987                            pool.connections.get(&opts),
 988                            Some(ConnectionPoolEntry::Connecting(_))
 989                        ));
 990                        match connection {
 991                            Ok(connection) => {
 992                                pool.connections.insert(
 993                                    opts.clone(),
 994                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
 995                                );
 996                                Ok(connection)
 997                            }
 998                            Err(error) => {
 999                                pool.connections.remove(&opts);
1000                                Err(Arc::new(error))
1001                            }
1002                        }
1003                    })?
1004                }
1005            })
1006            .shared();
1007
1008        self.connections
1009            .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1010        task
1011    }
1012}
1013
1014#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1015pub enum RemoteConnectionOptions {
1016    Ssh(SshConnectionOptions),
1017    Wsl(WslConnectionOptions),
1018}
1019
1020impl RemoteConnectionOptions {
1021    pub fn display_name(&self) -> String {
1022        match self {
1023            RemoteConnectionOptions::Ssh(opts) => opts.host.clone(),
1024            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1025        }
1026    }
1027}
1028
1029impl From<SshConnectionOptions> for RemoteConnectionOptions {
1030    fn from(opts: SshConnectionOptions) -> Self {
1031        RemoteConnectionOptions::Ssh(opts)
1032    }
1033}
1034
1035impl From<WslConnectionOptions> for RemoteConnectionOptions {
1036    fn from(opts: WslConnectionOptions) -> Self {
1037        RemoteConnectionOptions::Wsl(opts)
1038    }
1039}
1040
1041#[async_trait(?Send)]
1042pub(crate) trait RemoteConnection: Send + Sync {
1043    fn start_proxy(
1044        &self,
1045        unique_identifier: String,
1046        reconnect: bool,
1047        incoming_tx: UnboundedSender<Envelope>,
1048        outgoing_rx: UnboundedReceiver<Envelope>,
1049        connection_activity_tx: Sender<()>,
1050        delegate: Arc<dyn RemoteClientDelegate>,
1051        cx: &mut AsyncApp,
1052    ) -> Task<Result<i32>>;
1053    fn upload_directory(
1054        &self,
1055        src_path: PathBuf,
1056        dest_path: RemotePathBuf,
1057        cx: &App,
1058    ) -> Task<Result<()>>;
1059    async fn kill(&self) -> Result<()>;
1060    fn has_been_killed(&self) -> bool;
1061    fn shares_network_interface(&self) -> bool {
1062        false
1063    }
1064    fn build_command(
1065        &self,
1066        program: Option<String>,
1067        args: &[String],
1068        env: &HashMap<String, String>,
1069        working_dir: Option<String>,
1070        port_forward: Option<(u16, String, u16)>,
1071    ) -> Result<CommandTemplate>;
1072    fn connection_options(&self) -> RemoteConnectionOptions;
1073    fn path_style(&self) -> PathStyle;
1074    fn shell(&self) -> String;
1075    fn default_system_shell(&self) -> String;
1076
1077    #[cfg(any(test, feature = "test-support"))]
1078    fn simulate_disconnect(&self, _: &AsyncApp) {}
1079}
1080
1081type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1082
1083struct ChannelClient {
1084    next_message_id: AtomicU32,
1085    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1086    buffer: Mutex<VecDeque<Envelope>>,
1087    response_channels: ResponseChannels,
1088    message_handlers: Mutex<ProtoMessageHandlerSet>,
1089    max_received: AtomicU32,
1090    name: &'static str,
1091    task: Mutex<Task<Result<()>>>,
1092}
1093
1094impl ChannelClient {
1095    fn new(
1096        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1097        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1098        cx: &App,
1099        name: &'static str,
1100    ) -> Arc<Self> {
1101        Arc::new_cyclic(|this| Self {
1102            outgoing_tx: Mutex::new(outgoing_tx),
1103            next_message_id: AtomicU32::new(0),
1104            max_received: AtomicU32::new(0),
1105            response_channels: ResponseChannels::default(),
1106            message_handlers: Default::default(),
1107            buffer: Mutex::new(VecDeque::new()),
1108            name,
1109            task: Mutex::new(Self::start_handling_messages(
1110                this.clone(),
1111                incoming_rx,
1112                &cx.to_async(),
1113            )),
1114        })
1115    }
1116
1117    fn start_handling_messages(
1118        this: Weak<Self>,
1119        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1120        cx: &AsyncApp,
1121    ) -> Task<Result<()>> {
1122        cx.spawn(async move |cx| {
1123            let peer_id = PeerId { owner_id: 0, id: 0 };
1124            while let Some(incoming) = incoming_rx.next().await {
1125                let Some(this) = this.upgrade() else {
1126                    return anyhow::Ok(());
1127                };
1128                if let Some(ack_id) = incoming.ack_id {
1129                    let mut buffer = this.buffer.lock();
1130                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1131                        buffer.pop_front();
1132                    }
1133                }
1134                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1135                {
1136                    log::debug!(
1137                        "{}:ssh message received. name:FlushBufferedMessages",
1138                        this.name
1139                    );
1140                    {
1141                        let buffer = this.buffer.lock();
1142                        for envelope in buffer.iter() {
1143                            this.outgoing_tx
1144                                .lock()
1145                                .unbounded_send(envelope.clone())
1146                                .ok();
1147                        }
1148                    }
1149                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1150                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1151                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1152                    continue;
1153                }
1154
1155                this.max_received.store(incoming.id, SeqCst);
1156
1157                if let Some(request_id) = incoming.responding_to {
1158                    let request_id = MessageId(request_id);
1159                    let sender = this.response_channels.lock().remove(&request_id);
1160                    if let Some(sender) = sender {
1161                        let (tx, rx) = oneshot::channel();
1162                        if incoming.payload.is_some() {
1163                            sender.send((incoming, tx)).ok();
1164                        }
1165                        rx.await.ok();
1166                    }
1167                } else if let Some(envelope) =
1168                    build_typed_envelope(peer_id, Instant::now(), incoming)
1169                {
1170                    let type_name = envelope.payload_type_name();
1171                    let message_id = envelope.message_id();
1172                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1173                        &this.message_handlers,
1174                        envelope,
1175                        this.clone().into(),
1176                        cx.clone(),
1177                    ) {
1178                        log::debug!("{}:ssh message received. name:{type_name}", this.name);
1179                        cx.foreground_executor()
1180                            .spawn(async move {
1181                                match future.await {
1182                                    Ok(_) => {
1183                                        log::debug!(
1184                                            "{}:ssh message handled. name:{type_name}",
1185                                            this.name
1186                                        );
1187                                    }
1188                                    Err(error) => {
1189                                        log::error!(
1190                                            "{}:error handling message. type:{}, error:{:#}",
1191                                            this.name,
1192                                            type_name,
1193                                            format!("{error:#}").lines().fold(
1194                                                String::new(),
1195                                                |mut message, line| {
1196                                                    if !message.is_empty() {
1197                                                        message.push(' ');
1198                                                    }
1199                                                    message.push_str(line);
1200                                                    message
1201                                                }
1202                                            )
1203                                        );
1204                                    }
1205                                }
1206                            })
1207                            .detach()
1208                    } else {
1209                        log::error!("{}:unhandled ssh message name:{type_name}", this.name);
1210                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1211                            message_id,
1212                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1213                        ) {
1214                            log::error!(
1215                                "{}:error sending error response for {type_name}:{e:#}",
1216                                this.name
1217                            );
1218                        }
1219                    }
1220                }
1221            }
1222            anyhow::Ok(())
1223        })
1224    }
1225
1226    fn reconnect(
1227        self: &Arc<Self>,
1228        incoming_rx: UnboundedReceiver<Envelope>,
1229        outgoing_tx: UnboundedSender<Envelope>,
1230        cx: &AsyncApp,
1231    ) {
1232        *self.outgoing_tx.lock() = outgoing_tx;
1233        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1234    }
1235
1236    fn request<T: RequestMessage>(
1237        &self,
1238        payload: T,
1239    ) -> impl 'static + Future<Output = Result<T::Response>> {
1240        self.request_internal(payload, true)
1241    }
1242
1243    fn request_internal<T: RequestMessage>(
1244        &self,
1245        payload: T,
1246        use_buffer: bool,
1247    ) -> impl 'static + Future<Output = Result<T::Response>> {
1248        log::debug!("ssh request start. name:{}", T::NAME);
1249        let response =
1250            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1251        async move {
1252            let response = response.await?;
1253            log::debug!("ssh request finish. name:{}", T::NAME);
1254            T::Response::from_envelope(response).context("received a response of the wrong type")
1255        }
1256    }
1257
1258    async fn resync(&self, timeout: Duration) -> Result<()> {
1259        smol::future::or(
1260            async {
1261                self.request_internal(proto::FlushBufferedMessages {}, false)
1262                    .await?;
1263
1264                for envelope in self.buffer.lock().iter() {
1265                    self.outgoing_tx
1266                        .lock()
1267                        .unbounded_send(envelope.clone())
1268                        .ok();
1269                }
1270                Ok(())
1271            },
1272            async {
1273                smol::Timer::after(timeout).await;
1274                anyhow::bail!("Timed out resyncing remote client")
1275            },
1276        )
1277        .await
1278    }
1279
1280    async fn ping(&self, timeout: Duration) -> Result<()> {
1281        smol::future::or(
1282            async {
1283                self.request(proto::Ping {}).await?;
1284                Ok(())
1285            },
1286            async {
1287                smol::Timer::after(timeout).await;
1288                anyhow::bail!("Timed out pinging remote client")
1289            },
1290        )
1291        .await
1292    }
1293
1294    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1295        log::debug!("ssh send name:{}", T::NAME);
1296        self.send_dynamic(payload.into_envelope(0, None, None))
1297    }
1298
1299    fn request_dynamic(
1300        &self,
1301        mut envelope: proto::Envelope,
1302        type_name: &'static str,
1303        use_buffer: bool,
1304    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1305        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1306        let (tx, rx) = oneshot::channel();
1307        let mut response_channels_lock = self.response_channels.lock();
1308        response_channels_lock.insert(MessageId(envelope.id), tx);
1309        drop(response_channels_lock);
1310
1311        let result = if use_buffer {
1312            self.send_buffered(envelope)
1313        } else {
1314            self.send_unbuffered(envelope)
1315        };
1316        async move {
1317            if let Err(error) = &result {
1318                log::error!("failed to send message: {error}");
1319                anyhow::bail!("failed to send message: {error}");
1320            }
1321
1322            let response = rx.await.context("connection lost")?.0;
1323            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1324                return Err(RpcError::from_proto(error, type_name));
1325            }
1326            Ok(response)
1327        }
1328    }
1329
1330    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1331        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1332        self.send_buffered(envelope)
1333    }
1334
1335    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1336        envelope.ack_id = Some(self.max_received.load(SeqCst));
1337        self.buffer.lock().push_back(envelope.clone());
1338        // ignore errors on send (happen while we're reconnecting)
1339        // assume that the global "disconnected" overlay is sufficient.
1340        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1341        Ok(())
1342    }
1343
1344    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1345        envelope.ack_id = Some(self.max_received.load(SeqCst));
1346        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1347        Ok(())
1348    }
1349}
1350
1351impl ProtoClient for ChannelClient {
1352    fn request(
1353        &self,
1354        envelope: proto::Envelope,
1355        request_type: &'static str,
1356    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1357        self.request_dynamic(envelope, request_type, true).boxed()
1358    }
1359
1360    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1361        self.send_dynamic(envelope)
1362    }
1363
1364    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1365        self.send_dynamic(envelope)
1366    }
1367
1368    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1369        &self.message_handlers
1370    }
1371
1372    fn is_via_collab(&self) -> bool {
1373        false
1374    }
1375}
1376
1377#[cfg(any(test, feature = "test-support"))]
1378mod fake {
1379    use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1380    use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1381    use anyhow::Result;
1382    use askpass::EncryptedPassword;
1383    use async_trait::async_trait;
1384    use collections::HashMap;
1385    use futures::{
1386        FutureExt, SinkExt, StreamExt,
1387        channel::{
1388            mpsc::{self, Sender},
1389            oneshot,
1390        },
1391        select_biased,
1392    };
1393    use gpui::{App, AppContext as _, AsyncApp, SemanticVersion, Task, TestAppContext};
1394    use release_channel::ReleaseChannel;
1395    use rpc::proto::Envelope;
1396    use std::{path::PathBuf, sync::Arc};
1397    use util::paths::{PathStyle, RemotePathBuf};
1398
1399    pub(super) struct FakeRemoteConnection {
1400        pub(super) connection_options: RemoteConnectionOptions,
1401        pub(super) server_channel: Arc<ChannelClient>,
1402        pub(super) server_cx: SendableCx,
1403    }
1404
1405    pub(super) struct SendableCx(AsyncApp);
1406    impl SendableCx {
1407        // SAFETY: When run in test mode, GPUI is always single threaded.
1408        pub(super) fn new(cx: &TestAppContext) -> Self {
1409            Self(cx.to_async())
1410        }
1411
1412        // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1413        fn get(&self, _: &AsyncApp) -> AsyncApp {
1414            self.0.clone()
1415        }
1416    }
1417
1418    // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1419    unsafe impl Send for SendableCx {}
1420    unsafe impl Sync for SendableCx {}
1421
1422    #[async_trait(?Send)]
1423    impl RemoteConnection for FakeRemoteConnection {
1424        async fn kill(&self) -> Result<()> {
1425            Ok(())
1426        }
1427
1428        fn has_been_killed(&self) -> bool {
1429            false
1430        }
1431
1432        fn build_command(
1433            &self,
1434            program: Option<String>,
1435            args: &[String],
1436            env: &HashMap<String, String>,
1437            _: Option<String>,
1438            _: Option<(u16, String, u16)>,
1439        ) -> Result<CommandTemplate> {
1440            let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1441            let mut ssh_args = Vec::new();
1442            ssh_args.push(ssh_program);
1443            ssh_args.extend(args.iter().cloned());
1444            Ok(CommandTemplate {
1445                program: "ssh".into(),
1446                args: ssh_args,
1447                env: env.clone(),
1448            })
1449        }
1450
1451        fn upload_directory(
1452            &self,
1453            _src_path: PathBuf,
1454            _dest_path: RemotePathBuf,
1455            _cx: &App,
1456        ) -> Task<Result<()>> {
1457            unreachable!()
1458        }
1459
1460        fn connection_options(&self) -> RemoteConnectionOptions {
1461            self.connection_options.clone()
1462        }
1463
1464        fn simulate_disconnect(&self, cx: &AsyncApp) {
1465            let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1466            let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1467            self.server_channel
1468                .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1469        }
1470
1471        fn start_proxy(
1472            &self,
1473            _unique_identifier: String,
1474            _reconnect: bool,
1475            mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1476            mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1477            mut connection_activity_tx: Sender<()>,
1478            _delegate: Arc<dyn RemoteClientDelegate>,
1479            cx: &mut AsyncApp,
1480        ) -> Task<Result<i32>> {
1481            let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1482            let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1483
1484            self.server_channel.reconnect(
1485                server_incoming_rx,
1486                server_outgoing_tx,
1487                &self.server_cx.get(cx),
1488            );
1489
1490            cx.background_spawn(async move {
1491                loop {
1492                    select_biased! {
1493                        server_to_client = server_outgoing_rx.next().fuse() => {
1494                            let Some(server_to_client) = server_to_client else {
1495                                return Ok(1)
1496                            };
1497                            connection_activity_tx.try_send(()).ok();
1498                            client_incoming_tx.send(server_to_client).await.ok();
1499                        }
1500                        client_to_server = client_outgoing_rx.next().fuse() => {
1501                            let Some(client_to_server) = client_to_server else {
1502                                return Ok(1)
1503                            };
1504                            server_incoming_tx.send(client_to_server).await.ok();
1505                        }
1506                    }
1507                }
1508            })
1509        }
1510
1511        fn path_style(&self) -> PathStyle {
1512            PathStyle::local()
1513        }
1514
1515        fn shell(&self) -> String {
1516            "sh".to_owned()
1517        }
1518
1519        fn default_system_shell(&self) -> String {
1520            "sh".to_owned()
1521        }
1522    }
1523
1524    pub(super) struct Delegate;
1525
1526    impl RemoteClientDelegate for Delegate {
1527        fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1528            unreachable!()
1529        }
1530
1531        fn download_server_binary_locally(
1532            &self,
1533            _: RemotePlatform,
1534            _: ReleaseChannel,
1535            _: Option<SemanticVersion>,
1536            _: &mut AsyncApp,
1537        ) -> Task<Result<PathBuf>> {
1538            unreachable!()
1539        }
1540
1541        fn get_download_params(
1542            &self,
1543            _platform: RemotePlatform,
1544            _release_channel: ReleaseChannel,
1545            _version: Option<SemanticVersion>,
1546            _cx: &mut AsyncApp,
1547        ) -> Task<Result<Option<(String, String)>>> {
1548            unreachable!()
1549        }
1550
1551        fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1552    }
1553}