remote_client.rs

   1use crate::{
   2    SshConnectionOptions,
   3    protocol::MessageId,
   4    proxy::ProxyLaunchError,
   5    transport::{
   6        ssh::SshRemoteConnection,
   7        wsl::{WslConnectionOptions, WslRemoteConnection},
   8    },
   9};
  10use anyhow::{Context as _, Result, anyhow};
  11use async_trait::async_trait;
  12use collections::HashMap;
  13use futures::{
  14    Future, FutureExt as _, StreamExt as _,
  15    channel::{
  16        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  17        oneshot,
  18    },
  19    future::{BoxFuture, Shared},
  20    select, select_biased,
  21};
  22use gpui::{
  23    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  24    EventEmitter, Global, SemanticVersion, Task, WeakEntity,
  25};
  26use parking_lot::Mutex;
  27
  28use release_channel::ReleaseChannel;
  29use rpc::{
  30    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  31    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  32};
  33use std::{
  34    collections::VecDeque,
  35    fmt,
  36    ops::ControlFlow,
  37    path::PathBuf,
  38    sync::{
  39        Arc, Weak,
  40        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  41    },
  42    time::{Duration, Instant},
  43};
  44use util::{
  45    ResultExt,
  46    paths::{PathStyle, RemotePathBuf},
  47};
  48
  49#[derive(Copy, Clone, Debug)]
  50pub struct RemotePlatform {
  51    pub os: &'static str,
  52    pub arch: &'static str,
  53}
  54
  55#[derive(Clone, Debug)]
  56pub struct CommandTemplate {
  57    pub program: String,
  58    pub args: Vec<String>,
  59    pub env: HashMap<String, String>,
  60}
  61
  62pub trait RemoteClientDelegate: Send + Sync {
  63    fn ask_password(&self, prompt: String, tx: oneshot::Sender<String>, cx: &mut AsyncApp);
  64    fn get_download_params(
  65        &self,
  66        platform: RemotePlatform,
  67        release_channel: ReleaseChannel,
  68        version: Option<SemanticVersion>,
  69        cx: &mut AsyncApp,
  70    ) -> Task<Result<Option<(String, String)>>>;
  71    fn download_server_binary_locally(
  72        &self,
  73        platform: RemotePlatform,
  74        release_channel: ReleaseChannel,
  75        version: Option<SemanticVersion>,
  76        cx: &mut AsyncApp,
  77    ) -> Task<Result<PathBuf>>;
  78    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
  79}
  80
  81const MAX_MISSED_HEARTBEATS: usize = 5;
  82const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
  83const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
  84
  85const MAX_RECONNECT_ATTEMPTS: usize = 3;
  86
  87enum State {
  88    Connecting,
  89    Connected {
  90        ssh_connection: Arc<dyn RemoteConnection>,
  91        delegate: Arc<dyn RemoteClientDelegate>,
  92
  93        multiplex_task: Task<Result<()>>,
  94        heartbeat_task: Task<Result<()>>,
  95    },
  96    HeartbeatMissed {
  97        missed_heartbeats: usize,
  98
  99        ssh_connection: Arc<dyn RemoteConnection>,
 100        delegate: Arc<dyn RemoteClientDelegate>,
 101
 102        multiplex_task: Task<Result<()>>,
 103        heartbeat_task: Task<Result<()>>,
 104    },
 105    Reconnecting,
 106    ReconnectFailed {
 107        ssh_connection: Arc<dyn RemoteConnection>,
 108        delegate: Arc<dyn RemoteClientDelegate>,
 109
 110        error: anyhow::Error,
 111        attempts: usize,
 112    },
 113    ReconnectExhausted,
 114    ServerNotRunning,
 115}
 116
 117impl fmt::Display for State {
 118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 119        match self {
 120            Self::Connecting => write!(f, "connecting"),
 121            Self::Connected { .. } => write!(f, "connected"),
 122            Self::Reconnecting => write!(f, "reconnecting"),
 123            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 124            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 125            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 126            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 127        }
 128    }
 129}
 130
 131impl State {
 132    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 133        match self {
 134            Self::Connected { ssh_connection, .. } => Some(ssh_connection.clone()),
 135            Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.clone()),
 136            Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.clone()),
 137            _ => None,
 138        }
 139    }
 140
 141    fn can_reconnect(&self) -> bool {
 142        match self {
 143            Self::Connected { .. }
 144            | Self::HeartbeatMissed { .. }
 145            | Self::ReconnectFailed { .. } => true,
 146            State::Connecting
 147            | State::Reconnecting
 148            | State::ReconnectExhausted
 149            | State::ServerNotRunning => false,
 150        }
 151    }
 152
 153    fn is_reconnect_failed(&self) -> bool {
 154        matches!(self, Self::ReconnectFailed { .. })
 155    }
 156
 157    fn is_reconnect_exhausted(&self) -> bool {
 158        matches!(self, Self::ReconnectExhausted { .. })
 159    }
 160
 161    fn is_server_not_running(&self) -> bool {
 162        matches!(self, Self::ServerNotRunning)
 163    }
 164
 165    fn is_reconnecting(&self) -> bool {
 166        matches!(self, Self::Reconnecting { .. })
 167    }
 168
 169    fn heartbeat_recovered(self) -> Self {
 170        match self {
 171            Self::HeartbeatMissed {
 172                ssh_connection,
 173                delegate,
 174                multiplex_task,
 175                heartbeat_task,
 176                ..
 177            } => Self::Connected {
 178                ssh_connection,
 179                delegate,
 180                multiplex_task,
 181                heartbeat_task,
 182            },
 183            _ => self,
 184        }
 185    }
 186
 187    fn heartbeat_missed(self) -> Self {
 188        match self {
 189            Self::Connected {
 190                ssh_connection,
 191                delegate,
 192                multiplex_task,
 193                heartbeat_task,
 194            } => Self::HeartbeatMissed {
 195                missed_heartbeats: 1,
 196                ssh_connection,
 197                delegate,
 198                multiplex_task,
 199                heartbeat_task,
 200            },
 201            Self::HeartbeatMissed {
 202                missed_heartbeats,
 203                ssh_connection,
 204                delegate,
 205                multiplex_task,
 206                heartbeat_task,
 207            } => Self::HeartbeatMissed {
 208                missed_heartbeats: missed_heartbeats + 1,
 209                ssh_connection,
 210                delegate,
 211                multiplex_task,
 212                heartbeat_task,
 213            },
 214            _ => self,
 215        }
 216    }
 217}
 218
 219/// The state of the ssh connection.
 220#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 221pub enum ConnectionState {
 222    Connecting,
 223    Connected,
 224    HeartbeatMissed,
 225    Reconnecting,
 226    Disconnected,
 227}
 228
 229impl From<&State> for ConnectionState {
 230    fn from(value: &State) -> Self {
 231        match value {
 232            State::Connecting => Self::Connecting,
 233            State::Connected { .. } => Self::Connected,
 234            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 235            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 236            State::ReconnectExhausted => Self::Disconnected,
 237            State::ServerNotRunning => Self::Disconnected,
 238        }
 239    }
 240}
 241
 242pub struct RemoteClient {
 243    client: Arc<ChannelClient>,
 244    unique_identifier: String,
 245    connection_options: RemoteConnectionOptions,
 246    path_style: PathStyle,
 247    state: Option<State>,
 248}
 249
 250#[derive(Debug)]
 251pub enum RemoteClientEvent {
 252    Disconnected,
 253}
 254
 255impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 256
 257// Identifies the socket on the remote server so that reconnects
 258// can re-join the same project.
 259pub enum ConnectionIdentifier {
 260    Setup(u64),
 261    Workspace(i64),
 262}
 263
 264static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 265
 266impl ConnectionIdentifier {
 267    pub fn setup() -> Self {
 268        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 269    }
 270
 271    // This string gets used in a socket name, and so must be relatively short.
 272    // The total length of:
 273    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 274    // Must be less than about 100 characters
 275    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 276    // So our strings should be at most 20 characters or so.
 277    fn to_string(&self, cx: &App) -> String {
 278        let identifier_prefix = match ReleaseChannel::global(cx) {
 279            ReleaseChannel::Stable => "".to_string(),
 280            release_channel => format!("{}-", release_channel.dev_name()),
 281        };
 282        match self {
 283            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 284            Self::Workspace(workspace_id) => {
 285                format!("{identifier_prefix}workspace-{workspace_id}",)
 286            }
 287        }
 288    }
 289}
 290
 291impl RemoteClient {
 292    pub fn ssh(
 293        unique_identifier: ConnectionIdentifier,
 294        connection_options: SshConnectionOptions,
 295        cancellation: oneshot::Receiver<()>,
 296        delegate: Arc<dyn RemoteClientDelegate>,
 297        cx: &mut App,
 298    ) -> Task<Result<Option<Entity<Self>>>> {
 299        Self::new(
 300            unique_identifier,
 301            RemoteConnectionOptions::Ssh(connection_options),
 302            cancellation,
 303            delegate,
 304            cx,
 305        )
 306    }
 307
 308    pub fn new(
 309        unique_identifier: ConnectionIdentifier,
 310        connection_options: RemoteConnectionOptions,
 311        cancellation: oneshot::Receiver<()>,
 312        delegate: Arc<dyn RemoteClientDelegate>,
 313        cx: &mut App,
 314    ) -> Task<Result<Option<Entity<Self>>>> {
 315        let unique_identifier = unique_identifier.to_string(cx);
 316        cx.spawn(async move |cx| {
 317            let success = Box::pin(async move {
 318                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 319                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 320                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 321
 322                let client =
 323                    cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "client"))?;
 324
 325                let ssh_connection = cx
 326                    .update(|cx| {
 327                        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 328                            pool.connect(connection_options.clone(), &delegate, cx)
 329                        })
 330                    })?
 331                    .await
 332                    .map_err(|e| e.cloned())?;
 333
 334                let path_style = ssh_connection.path_style();
 335                let this = cx.new(|_| Self {
 336                    client: client.clone(),
 337                    unique_identifier: unique_identifier.clone(),
 338                    connection_options,
 339                    path_style,
 340                    state: Some(State::Connecting),
 341                })?;
 342
 343                let io_task = ssh_connection.start_proxy(
 344                    unique_identifier,
 345                    false,
 346                    incoming_tx,
 347                    outgoing_rx,
 348                    connection_activity_tx,
 349                    delegate.clone(),
 350                    cx,
 351                );
 352
 353                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 354
 355                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 356                    log::error!("failed to establish connection: {}", error);
 357                    return Err(error);
 358                }
 359
 360                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 361
 362                this.update(cx, |this, _| {
 363                    this.state = Some(State::Connected {
 364                        ssh_connection,
 365                        delegate,
 366                        multiplex_task,
 367                        heartbeat_task,
 368                    });
 369                })?;
 370
 371                Ok(Some(this))
 372            });
 373
 374            select! {
 375                _ = cancellation.fuse() => {
 376                    Ok(None)
 377                }
 378                result = success.fuse() =>  result
 379            }
 380        })
 381    }
 382
 383    pub fn proto_client_from_channels(
 384        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 385        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 386        cx: &App,
 387        name: &'static str,
 388    ) -> AnyProtoClient {
 389        ChannelClient::new(incoming_rx, outgoing_tx, cx, name).into()
 390    }
 391
 392    pub fn shutdown_processes<T: RequestMessage>(
 393        &mut self,
 394        shutdown_request: Option<T>,
 395        executor: BackgroundExecutor,
 396    ) -> Option<impl Future<Output = ()> + use<T>> {
 397        let state = self.state.take()?;
 398        log::info!("shutting down ssh processes");
 399
 400        let State::Connected {
 401            multiplex_task,
 402            heartbeat_task,
 403            ssh_connection,
 404            delegate,
 405        } = state
 406        else {
 407            return None;
 408        };
 409
 410        let client = self.client.clone();
 411
 412        Some(async move {
 413            if let Some(shutdown_request) = shutdown_request {
 414                client.send(shutdown_request).log_err();
 415                // We wait 50ms instead of waiting for a response, because
 416                // waiting for a response would require us to wait on the main thread
 417                // which we want to avoid in an `on_app_quit` callback.
 418                executor.timer(Duration::from_millis(50)).await;
 419            }
 420
 421            // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
 422            // child of master_process.
 423            drop(multiplex_task);
 424            // Now drop the rest of state, which kills master process.
 425            drop(heartbeat_task);
 426            drop(ssh_connection);
 427            drop(delegate);
 428        })
 429    }
 430
 431    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 432        let can_reconnect = self
 433            .state
 434            .as_ref()
 435            .map(|state| state.can_reconnect())
 436            .unwrap_or(false);
 437        if !can_reconnect {
 438            log::info!("aborting reconnect, because not in state that allows reconnecting");
 439            let error = if let Some(state) = self.state.as_ref() {
 440                format!("invalid state, cannot reconnect while in state {state}")
 441            } else {
 442                "no state set".to_string()
 443            };
 444            anyhow::bail!(error);
 445        }
 446
 447        let state = self.state.take().unwrap();
 448        let (attempts, remote_connection, delegate) = match state {
 449            State::Connected {
 450                ssh_connection,
 451                delegate,
 452                multiplex_task,
 453                heartbeat_task,
 454            }
 455            | State::HeartbeatMissed {
 456                ssh_connection,
 457                delegate,
 458                multiplex_task,
 459                heartbeat_task,
 460                ..
 461            } => {
 462                drop(multiplex_task);
 463                drop(heartbeat_task);
 464                (0, ssh_connection, delegate)
 465            }
 466            State::ReconnectFailed {
 467                attempts,
 468                ssh_connection,
 469                delegate,
 470                ..
 471            } => (attempts, ssh_connection, delegate),
 472            State::Connecting
 473            | State::Reconnecting
 474            | State::ReconnectExhausted
 475            | State::ServerNotRunning => unreachable!(),
 476        };
 477
 478        let attempts = attempts + 1;
 479        if attempts > MAX_RECONNECT_ATTEMPTS {
 480            log::error!(
 481                "Failed to reconnect to after {} attempts, giving up",
 482                MAX_RECONNECT_ATTEMPTS
 483            );
 484            self.set_state(State::ReconnectExhausted, cx);
 485            return Ok(());
 486        }
 487
 488        self.set_state(State::Reconnecting, cx);
 489
 490        log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
 491
 492        let unique_identifier = self.unique_identifier.clone();
 493        let client = self.client.clone();
 494        let reconnect_task = cx.spawn(async move |this, cx| {
 495            macro_rules! failed {
 496                ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr) => {
 497                    return State::ReconnectFailed {
 498                        error: anyhow!($error),
 499                        attempts: $attempts,
 500                        ssh_connection: $ssh_connection,
 501                        delegate: $delegate,
 502                    };
 503                };
 504            }
 505
 506            if let Err(error) = remote_connection
 507                .kill()
 508                .await
 509                .context("Failed to kill ssh process")
 510            {
 511                failed!(error, attempts, remote_connection, delegate);
 512            };
 513
 514            let connection_options = remote_connection.connection_options();
 515
 516            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 517            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 518            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 519
 520            let (ssh_connection, io_task) = match async {
 521                let ssh_connection = cx
 522                    .update_global(|pool: &mut ConnectionPool, cx| {
 523                        pool.connect(connection_options, &delegate, cx)
 524                    })?
 525                    .await
 526                    .map_err(|error| error.cloned())?;
 527
 528                let io_task = ssh_connection.start_proxy(
 529                    unique_identifier,
 530                    true,
 531                    incoming_tx,
 532                    outgoing_rx,
 533                    connection_activity_tx,
 534                    delegate.clone(),
 535                    cx,
 536                );
 537                anyhow::Ok((ssh_connection, io_task))
 538            }
 539            .await
 540            {
 541                Ok((ssh_connection, io_task)) => (ssh_connection, io_task),
 542                Err(error) => {
 543                    failed!(error, attempts, remote_connection, delegate);
 544                }
 545            };
 546
 547            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 548            client.reconnect(incoming_rx, outgoing_tx, cx);
 549
 550            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 551                failed!(error, attempts, ssh_connection, delegate);
 552            };
 553
 554            State::Connected {
 555                ssh_connection,
 556                delegate,
 557                multiplex_task,
 558                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 559            }
 560        });
 561
 562        cx.spawn(async move |this, cx| {
 563            let new_state = reconnect_task.await;
 564            this.update(cx, |this, cx| {
 565                this.try_set_state(cx, |old_state| {
 566                    if old_state.is_reconnecting() {
 567                        match &new_state {
 568                            State::Connecting
 569                            | State::Reconnecting
 570                            | State::HeartbeatMissed { .. }
 571                            | State::ServerNotRunning => {}
 572                            State::Connected { .. } => {
 573                                log::info!("Successfully reconnected");
 574                            }
 575                            State::ReconnectFailed {
 576                                error, attempts, ..
 577                            } => {
 578                                log::error!(
 579                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 580                                    attempts,
 581                                    error
 582                                );
 583                            }
 584                            State::ReconnectExhausted => {
 585                                log::error!("Reconnect attempt failed and all attempts exhausted");
 586                            }
 587                        }
 588                        Some(new_state)
 589                    } else {
 590                        None
 591                    }
 592                });
 593
 594                if this.state_is(State::is_reconnect_failed) {
 595                    this.reconnect(cx)
 596                } else if this.state_is(State::is_reconnect_exhausted) {
 597                    Ok(())
 598                } else {
 599                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 600                    Ok(())
 601                }
 602            })
 603        })
 604        .detach_and_log_err(cx);
 605
 606        Ok(())
 607    }
 608
 609    fn heartbeat(
 610        this: WeakEntity<Self>,
 611        mut connection_activity_rx: mpsc::Receiver<()>,
 612        cx: &mut AsyncApp,
 613    ) -> Task<Result<()>> {
 614        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 615            return Task::ready(Err(anyhow!("SshRemoteClient lost")));
 616        };
 617
 618        cx.spawn(async move |cx| {
 619            let mut missed_heartbeats = 0;
 620
 621            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 622            futures::pin_mut!(keepalive_timer);
 623
 624            loop {
 625                select_biased! {
 626                    result = connection_activity_rx.next().fuse() => {
 627                        if result.is_none() {
 628                            log::warn!("ssh heartbeat: connection activity channel has been dropped. stopping.");
 629                            return Ok(());
 630                        }
 631
 632                        if missed_heartbeats != 0 {
 633                            missed_heartbeats = 0;
 634                            let _ =this.update(cx, |this, cx| {
 635                                this.handle_heartbeat_result(missed_heartbeats, cx)
 636                            })?;
 637                        }
 638                    }
 639                    _ = keepalive_timer => {
 640                        log::debug!("Sending heartbeat to server...");
 641
 642                        let result = select_biased! {
 643                            _ = connection_activity_rx.next().fuse() => {
 644                                Ok(())
 645                            }
 646                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 647                                ping_result
 648                            }
 649                        };
 650
 651                        if result.is_err() {
 652                            missed_heartbeats += 1;
 653                            log::warn!(
 654                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 655                                HEARTBEAT_TIMEOUT,
 656                                missed_heartbeats,
 657                                MAX_MISSED_HEARTBEATS
 658                            );
 659                        } else if missed_heartbeats != 0 {
 660                            missed_heartbeats = 0;
 661                        } else {
 662                            continue;
 663                        }
 664
 665                        let result = this.update(cx, |this, cx| {
 666                            this.handle_heartbeat_result(missed_heartbeats, cx)
 667                        })?;
 668                        if result.is_break() {
 669                            return Ok(());
 670                        }
 671                    }
 672                }
 673
 674                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 675            }
 676        })
 677    }
 678
 679    fn handle_heartbeat_result(
 680        &mut self,
 681        missed_heartbeats: usize,
 682        cx: &mut Context<Self>,
 683    ) -> ControlFlow<()> {
 684        let state = self.state.take().unwrap();
 685        let next_state = if missed_heartbeats > 0 {
 686            state.heartbeat_missed()
 687        } else {
 688            state.heartbeat_recovered()
 689        };
 690
 691        self.set_state(next_state, cx);
 692
 693        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 694            log::error!(
 695                "Missed last {} heartbeats. Reconnecting...",
 696                missed_heartbeats
 697            );
 698
 699            self.reconnect(cx)
 700                .context("failed to start reconnect process after missing heartbeats")
 701                .log_err();
 702            ControlFlow::Break(())
 703        } else {
 704            ControlFlow::Continue(())
 705        }
 706    }
 707
 708    fn monitor(
 709        this: WeakEntity<Self>,
 710        io_task: Task<Result<i32>>,
 711        cx: &AsyncApp,
 712    ) -> Task<Result<()>> {
 713        cx.spawn(async move |cx| {
 714            let result = io_task.await;
 715
 716            match result {
 717                Ok(exit_code) => {
 718                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 719                        match error {
 720                            ProxyLaunchError::ServerNotRunning => {
 721                                log::error!("failed to reconnect because server is not running");
 722                                this.update(cx, |this, cx| {
 723                                    this.set_state(State::ServerNotRunning, cx);
 724                                })?;
 725                            }
 726                        }
 727                    } else if exit_code > 0 {
 728                        log::error!("proxy process terminated unexpectedly");
 729                        this.update(cx, |this, cx| {
 730                            this.reconnect(cx).ok();
 731                        })?;
 732                    }
 733                }
 734                Err(error) => {
 735                    log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
 736                    this.update(cx, |this, cx| {
 737                        this.reconnect(cx).ok();
 738                    })?;
 739                }
 740            }
 741
 742            Ok(())
 743        })
 744    }
 745
 746    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 747        self.state.as_ref().is_some_and(check)
 748    }
 749
 750    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 751        let new_state = self.state.as_ref().and_then(map);
 752        if let Some(new_state) = new_state {
 753            self.state.replace(new_state);
 754            cx.notify();
 755        }
 756    }
 757
 758    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 759        log::info!("setting state to '{}'", &state);
 760
 761        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 762        let is_server_not_running = state.is_server_not_running();
 763        self.state.replace(state);
 764
 765        if is_reconnect_exhausted || is_server_not_running {
 766            cx.emit(RemoteClientEvent::Disconnected);
 767        }
 768        cx.notify();
 769    }
 770
 771    pub fn shell(&self) -> Option<String> {
 772        Some(self.remote_connection()?.shell())
 773    }
 774
 775    pub fn default_system_shell(&self) -> Option<String> {
 776        Some(self.remote_connection()?.default_system_shell())
 777    }
 778
 779    pub fn shares_network_interface(&self) -> bool {
 780        self.remote_connection()
 781            .map_or(false, |connection| connection.shares_network_interface())
 782    }
 783
 784    pub fn build_command(
 785        &self,
 786        program: Option<String>,
 787        args: &[String],
 788        env: &HashMap<String, String>,
 789        working_dir: Option<String>,
 790        port_forward: Option<(u16, String, u16)>,
 791    ) -> Result<CommandTemplate> {
 792        let Some(connection) = self.remote_connection() else {
 793            return Err(anyhow!("no ssh connection"));
 794        };
 795        connection.build_command(program, args, env, working_dir, port_forward)
 796    }
 797
 798    pub fn upload_directory(
 799        &self,
 800        src_path: PathBuf,
 801        dest_path: RemotePathBuf,
 802        cx: &App,
 803    ) -> Task<Result<()>> {
 804        let Some(connection) = self.remote_connection() else {
 805            return Task::ready(Err(anyhow!("no ssh connection")));
 806        };
 807        connection.upload_directory(src_path, dest_path, cx)
 808    }
 809
 810    pub fn proto_client(&self) -> AnyProtoClient {
 811        self.client.clone().into()
 812    }
 813
 814    pub fn connection_options(&self) -> RemoteConnectionOptions {
 815        self.connection_options.clone()
 816    }
 817
 818    pub fn connection_state(&self) -> ConnectionState {
 819        self.state
 820            .as_ref()
 821            .map(ConnectionState::from)
 822            .unwrap_or(ConnectionState::Disconnected)
 823    }
 824
 825    pub fn is_disconnected(&self) -> bool {
 826        self.connection_state() == ConnectionState::Disconnected
 827    }
 828
 829    pub fn path_style(&self) -> PathStyle {
 830        self.path_style
 831    }
 832
 833    #[cfg(any(test, feature = "test-support"))]
 834    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
 835        let opts = self.connection_options();
 836        client_cx.spawn(async move |cx| {
 837            let connection = cx
 838                .update_global(|c: &mut ConnectionPool, _| {
 839                    if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
 840                        c.clone()
 841                    } else {
 842                        panic!("missing test connection")
 843                    }
 844                })
 845                .unwrap()
 846                .await
 847                .unwrap();
 848
 849            connection.simulate_disconnect(cx);
 850        })
 851    }
 852
 853    #[cfg(any(test, feature = "test-support"))]
 854    pub fn fake_server(
 855        client_cx: &mut gpui::TestAppContext,
 856        server_cx: &mut gpui::TestAppContext,
 857    ) -> (RemoteConnectionOptions, AnyProtoClient) {
 858        let port = client_cx
 859            .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
 860        let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
 861            host: "<fake>".to_string(),
 862            port: Some(port),
 863            ..Default::default()
 864        });
 865        let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
 866        let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
 867        let server_client =
 868            server_cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server"));
 869        let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
 870            connection_options: opts.clone(),
 871            server_cx: fake::SendableCx::new(server_cx),
 872            server_channel: server_client.clone(),
 873        });
 874
 875        client_cx.update(|cx| {
 876            cx.update_default_global(|c: &mut ConnectionPool, cx| {
 877                c.connections.insert(
 878                    opts.clone(),
 879                    ConnectionPoolEntry::Connecting(
 880                        cx.background_spawn({
 881                            let connection = connection.clone();
 882                            async move { Ok(connection.clone()) }
 883                        })
 884                        .shared(),
 885                    ),
 886                );
 887            })
 888        });
 889
 890        (opts, server_client.into())
 891    }
 892
 893    #[cfg(any(test, feature = "test-support"))]
 894    pub async fn fake_client(
 895        opts: RemoteConnectionOptions,
 896        client_cx: &mut gpui::TestAppContext,
 897    ) -> Entity<Self> {
 898        let (_tx, rx) = oneshot::channel();
 899        client_cx
 900            .update(|cx| {
 901                Self::new(
 902                    ConnectionIdentifier::setup(),
 903                    opts,
 904                    rx,
 905                    Arc::new(fake::Delegate),
 906                    cx,
 907                )
 908            })
 909            .await
 910            .unwrap()
 911            .unwrap()
 912    }
 913
 914    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 915        self.state
 916            .as_ref()
 917            .and_then(|state| state.remote_connection())
 918    }
 919}
 920
 921enum ConnectionPoolEntry {
 922    Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
 923    Connected(Weak<dyn RemoteConnection>),
 924}
 925
 926#[derive(Default)]
 927struct ConnectionPool {
 928    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
 929}
 930
 931impl Global for ConnectionPool {}
 932
 933impl ConnectionPool {
 934    pub fn connect(
 935        &mut self,
 936        opts: RemoteConnectionOptions,
 937        delegate: &Arc<dyn RemoteClientDelegate>,
 938        cx: &mut App,
 939    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
 940        let connection = self.connections.get(&opts);
 941        match connection {
 942            Some(ConnectionPoolEntry::Connecting(task)) => {
 943                let delegate = delegate.clone();
 944                cx.spawn(async move |cx| {
 945                    delegate.set_status(Some("Waiting for existing connection attempt"), cx);
 946                })
 947                .detach();
 948                return task.clone();
 949            }
 950            Some(ConnectionPoolEntry::Connected(ssh)) => {
 951                if let Some(ssh) = ssh.upgrade()
 952                    && !ssh.has_been_killed()
 953                {
 954                    return Task::ready(Ok(ssh)).shared();
 955                }
 956                self.connections.remove(&opts);
 957            }
 958            None => {}
 959        }
 960
 961        let task = cx
 962            .spawn({
 963                let opts = opts.clone();
 964                let delegate = delegate.clone();
 965                async move |cx| {
 966                    let connection = match opts.clone() {
 967                        RemoteConnectionOptions::Ssh(opts) => {
 968                            SshRemoteConnection::new(opts, delegate, cx)
 969                                .await
 970                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
 971                        }
 972                        RemoteConnectionOptions::Wsl(opts) => {
 973                            WslRemoteConnection::new(opts, delegate, cx)
 974                                .await
 975                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
 976                        }
 977                    };
 978
 979                    cx.update_global(|pool: &mut Self, _| {
 980                        debug_assert!(matches!(
 981                            pool.connections.get(&opts),
 982                            Some(ConnectionPoolEntry::Connecting(_))
 983                        ));
 984                        match connection {
 985                            Ok(connection) => {
 986                                pool.connections.insert(
 987                                    opts.clone(),
 988                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
 989                                );
 990                                Ok(connection)
 991                            }
 992                            Err(error) => {
 993                                pool.connections.remove(&opts);
 994                                Err(Arc::new(error))
 995                            }
 996                        }
 997                    })?
 998                }
 999            })
1000            .shared();
1001
1002        self.connections
1003            .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1004        task
1005    }
1006}
1007
1008#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1009pub enum RemoteConnectionOptions {
1010    Ssh(SshConnectionOptions),
1011    Wsl(WslConnectionOptions),
1012}
1013
1014impl RemoteConnectionOptions {
1015    pub fn display_name(&self) -> String {
1016        match self {
1017            RemoteConnectionOptions::Ssh(opts) => opts.host.clone(),
1018            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1019        }
1020    }
1021}
1022
1023impl From<SshConnectionOptions> for RemoteConnectionOptions {
1024    fn from(opts: SshConnectionOptions) -> Self {
1025        RemoteConnectionOptions::Ssh(opts)
1026    }
1027}
1028
1029impl From<WslConnectionOptions> for RemoteConnectionOptions {
1030    fn from(opts: WslConnectionOptions) -> Self {
1031        RemoteConnectionOptions::Wsl(opts)
1032    }
1033}
1034
1035#[async_trait(?Send)]
1036pub(crate) trait RemoteConnection: Send + Sync {
1037    fn start_proxy(
1038        &self,
1039        unique_identifier: String,
1040        reconnect: bool,
1041        incoming_tx: UnboundedSender<Envelope>,
1042        outgoing_rx: UnboundedReceiver<Envelope>,
1043        connection_activity_tx: Sender<()>,
1044        delegate: Arc<dyn RemoteClientDelegate>,
1045        cx: &mut AsyncApp,
1046    ) -> Task<Result<i32>>;
1047    fn upload_directory(
1048        &self,
1049        src_path: PathBuf,
1050        dest_path: RemotePathBuf,
1051        cx: &App,
1052    ) -> Task<Result<()>>;
1053    async fn kill(&self) -> Result<()>;
1054    fn has_been_killed(&self) -> bool;
1055    fn shares_network_interface(&self) -> bool {
1056        false
1057    }
1058    fn build_command(
1059        &self,
1060        program: Option<String>,
1061        args: &[String],
1062        env: &HashMap<String, String>,
1063        working_dir: Option<String>,
1064        port_forward: Option<(u16, String, u16)>,
1065    ) -> Result<CommandTemplate>;
1066    fn connection_options(&self) -> RemoteConnectionOptions;
1067    fn path_style(&self) -> PathStyle;
1068    fn shell(&self) -> String;
1069    fn default_system_shell(&self) -> String;
1070
1071    #[cfg(any(test, feature = "test-support"))]
1072    fn simulate_disconnect(&self, _: &AsyncApp) {}
1073}
1074
1075type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1076
1077struct ChannelClient {
1078    next_message_id: AtomicU32,
1079    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1080    buffer: Mutex<VecDeque<Envelope>>,
1081    response_channels: ResponseChannels,
1082    message_handlers: Mutex<ProtoMessageHandlerSet>,
1083    max_received: AtomicU32,
1084    name: &'static str,
1085    task: Mutex<Task<Result<()>>>,
1086}
1087
1088impl ChannelClient {
1089    fn new(
1090        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1091        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1092        cx: &App,
1093        name: &'static str,
1094    ) -> Arc<Self> {
1095        Arc::new_cyclic(|this| Self {
1096            outgoing_tx: Mutex::new(outgoing_tx),
1097            next_message_id: AtomicU32::new(0),
1098            max_received: AtomicU32::new(0),
1099            response_channels: ResponseChannels::default(),
1100            message_handlers: Default::default(),
1101            buffer: Mutex::new(VecDeque::new()),
1102            name,
1103            task: Mutex::new(Self::start_handling_messages(
1104                this.clone(),
1105                incoming_rx,
1106                &cx.to_async(),
1107            )),
1108        })
1109    }
1110
1111    fn start_handling_messages(
1112        this: Weak<Self>,
1113        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1114        cx: &AsyncApp,
1115    ) -> Task<Result<()>> {
1116        cx.spawn(async move |cx| {
1117            let peer_id = PeerId { owner_id: 0, id: 0 };
1118            while let Some(incoming) = incoming_rx.next().await {
1119                let Some(this) = this.upgrade() else {
1120                    return anyhow::Ok(());
1121                };
1122                if let Some(ack_id) = incoming.ack_id {
1123                    let mut buffer = this.buffer.lock();
1124                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1125                        buffer.pop_front();
1126                    }
1127                }
1128                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1129                {
1130                    log::debug!(
1131                        "{}:ssh message received. name:FlushBufferedMessages",
1132                        this.name
1133                    );
1134                    {
1135                        let buffer = this.buffer.lock();
1136                        for envelope in buffer.iter() {
1137                            this.outgoing_tx
1138                                .lock()
1139                                .unbounded_send(envelope.clone())
1140                                .ok();
1141                        }
1142                    }
1143                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1144                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1145                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1146                    continue;
1147                }
1148
1149                this.max_received.store(incoming.id, SeqCst);
1150
1151                if let Some(request_id) = incoming.responding_to {
1152                    let request_id = MessageId(request_id);
1153                    let sender = this.response_channels.lock().remove(&request_id);
1154                    if let Some(sender) = sender {
1155                        let (tx, rx) = oneshot::channel();
1156                        if incoming.payload.is_some() {
1157                            sender.send((incoming, tx)).ok();
1158                        }
1159                        rx.await.ok();
1160                    }
1161                } else if let Some(envelope) =
1162                    build_typed_envelope(peer_id, Instant::now(), incoming)
1163                {
1164                    let type_name = envelope.payload_type_name();
1165                    let message_id = envelope.message_id();
1166                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1167                        &this.message_handlers,
1168                        envelope,
1169                        this.clone().into(),
1170                        cx.clone(),
1171                    ) {
1172                        log::debug!("{}:ssh message received. name:{type_name}", this.name);
1173                        cx.foreground_executor()
1174                            .spawn(async move {
1175                                match future.await {
1176                                    Ok(_) => {
1177                                        log::debug!(
1178                                            "{}:ssh message handled. name:{type_name}",
1179                                            this.name
1180                                        );
1181                                    }
1182                                    Err(error) => {
1183                                        log::error!(
1184                                            "{}:error handling message. type:{}, error:{:#}",
1185                                            this.name,
1186                                            type_name,
1187                                            format!("{error:#}").lines().fold(
1188                                                String::new(),
1189                                                |mut message, line| {
1190                                                    if !message.is_empty() {
1191                                                        message.push(' ');
1192                                                    }
1193                                                    message.push_str(line);
1194                                                    message
1195                                                }
1196                                            )
1197                                        );
1198                                    }
1199                                }
1200                            })
1201                            .detach()
1202                    } else {
1203                        log::error!("{}:unhandled ssh message name:{type_name}", this.name);
1204                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1205                            message_id,
1206                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1207                        ) {
1208                            log::error!(
1209                                "{}:error sending error response for {type_name}:{e:#}",
1210                                this.name
1211                            );
1212                        }
1213                    }
1214                }
1215            }
1216            anyhow::Ok(())
1217        })
1218    }
1219
1220    fn reconnect(
1221        self: &Arc<Self>,
1222        incoming_rx: UnboundedReceiver<Envelope>,
1223        outgoing_tx: UnboundedSender<Envelope>,
1224        cx: &AsyncApp,
1225    ) {
1226        *self.outgoing_tx.lock() = outgoing_tx;
1227        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1228    }
1229
1230    fn request<T: RequestMessage>(
1231        &self,
1232        payload: T,
1233    ) -> impl 'static + Future<Output = Result<T::Response>> {
1234        self.request_internal(payload, true)
1235    }
1236
1237    fn request_internal<T: RequestMessage>(
1238        &self,
1239        payload: T,
1240        use_buffer: bool,
1241    ) -> impl 'static + Future<Output = Result<T::Response>> {
1242        log::debug!("ssh request start. name:{}", T::NAME);
1243        let response =
1244            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1245        async move {
1246            let response = response.await?;
1247            log::debug!("ssh request finish. name:{}", T::NAME);
1248            T::Response::from_envelope(response).context("received a response of the wrong type")
1249        }
1250    }
1251
1252    async fn resync(&self, timeout: Duration) -> Result<()> {
1253        smol::future::or(
1254            async {
1255                self.request_internal(proto::FlushBufferedMessages {}, false)
1256                    .await?;
1257
1258                for envelope in self.buffer.lock().iter() {
1259                    self.outgoing_tx
1260                        .lock()
1261                        .unbounded_send(envelope.clone())
1262                        .ok();
1263                }
1264                Ok(())
1265            },
1266            async {
1267                smol::Timer::after(timeout).await;
1268                anyhow::bail!("Timed out resyncing remote client")
1269            },
1270        )
1271        .await
1272    }
1273
1274    async fn ping(&self, timeout: Duration) -> Result<()> {
1275        smol::future::or(
1276            async {
1277                self.request(proto::Ping {}).await?;
1278                Ok(())
1279            },
1280            async {
1281                smol::Timer::after(timeout).await;
1282                anyhow::bail!("Timed out pinging remote client")
1283            },
1284        )
1285        .await
1286    }
1287
1288    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1289        log::debug!("ssh send name:{}", T::NAME);
1290        self.send_dynamic(payload.into_envelope(0, None, None))
1291    }
1292
1293    fn request_dynamic(
1294        &self,
1295        mut envelope: proto::Envelope,
1296        type_name: &'static str,
1297        use_buffer: bool,
1298    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1299        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1300        let (tx, rx) = oneshot::channel();
1301        let mut response_channels_lock = self.response_channels.lock();
1302        response_channels_lock.insert(MessageId(envelope.id), tx);
1303        drop(response_channels_lock);
1304
1305        let result = if use_buffer {
1306            self.send_buffered(envelope)
1307        } else {
1308            self.send_unbuffered(envelope)
1309        };
1310        async move {
1311            if let Err(error) = &result {
1312                log::error!("failed to send message: {error}");
1313                anyhow::bail!("failed to send message: {error}");
1314            }
1315
1316            let response = rx.await.context("connection lost")?.0;
1317            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1318                return Err(RpcError::from_proto(error, type_name));
1319            }
1320            Ok(response)
1321        }
1322    }
1323
1324    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1325        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1326        self.send_buffered(envelope)
1327    }
1328
1329    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1330        envelope.ack_id = Some(self.max_received.load(SeqCst));
1331        self.buffer.lock().push_back(envelope.clone());
1332        // ignore errors on send (happen while we're reconnecting)
1333        // assume that the global "disconnected" overlay is sufficient.
1334        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1335        Ok(())
1336    }
1337
1338    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1339        envelope.ack_id = Some(self.max_received.load(SeqCst));
1340        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1341        Ok(())
1342    }
1343}
1344
1345impl ProtoClient for ChannelClient {
1346    fn request(
1347        &self,
1348        envelope: proto::Envelope,
1349        request_type: &'static str,
1350    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1351        self.request_dynamic(envelope, request_type, true).boxed()
1352    }
1353
1354    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1355        self.send_dynamic(envelope)
1356    }
1357
1358    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1359        self.send_dynamic(envelope)
1360    }
1361
1362    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1363        &self.message_handlers
1364    }
1365
1366    fn is_via_collab(&self) -> bool {
1367        false
1368    }
1369}
1370
1371#[cfg(any(test, feature = "test-support"))]
1372mod fake {
1373    use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1374    use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1375    use anyhow::Result;
1376    use async_trait::async_trait;
1377    use collections::HashMap;
1378    use futures::{
1379        FutureExt, SinkExt, StreamExt,
1380        channel::{
1381            mpsc::{self, Sender},
1382            oneshot,
1383        },
1384        select_biased,
1385    };
1386    use gpui::{App, AppContext as _, AsyncApp, SemanticVersion, Task, TestAppContext};
1387    use release_channel::ReleaseChannel;
1388    use rpc::proto::Envelope;
1389    use std::{path::PathBuf, sync::Arc};
1390    use util::paths::{PathStyle, RemotePathBuf};
1391
1392    pub(super) struct FakeRemoteConnection {
1393        pub(super) connection_options: RemoteConnectionOptions,
1394        pub(super) server_channel: Arc<ChannelClient>,
1395        pub(super) server_cx: SendableCx,
1396    }
1397
1398    pub(super) struct SendableCx(AsyncApp);
1399    impl SendableCx {
1400        // SAFETY: When run in test mode, GPUI is always single threaded.
1401        pub(super) fn new(cx: &TestAppContext) -> Self {
1402            Self(cx.to_async())
1403        }
1404
1405        // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1406        fn get(&self, _: &AsyncApp) -> AsyncApp {
1407            self.0.clone()
1408        }
1409    }
1410
1411    // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1412    unsafe impl Send for SendableCx {}
1413    unsafe impl Sync for SendableCx {}
1414
1415    #[async_trait(?Send)]
1416    impl RemoteConnection for FakeRemoteConnection {
1417        async fn kill(&self) -> Result<()> {
1418            Ok(())
1419        }
1420
1421        fn has_been_killed(&self) -> bool {
1422            false
1423        }
1424
1425        fn build_command(
1426            &self,
1427            program: Option<String>,
1428            args: &[String],
1429            env: &HashMap<String, String>,
1430            _: Option<String>,
1431            _: Option<(u16, String, u16)>,
1432        ) -> Result<CommandTemplate> {
1433            let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1434            let mut ssh_args = Vec::new();
1435            ssh_args.push(ssh_program);
1436            ssh_args.extend(args.iter().cloned());
1437            Ok(CommandTemplate {
1438                program: "ssh".into(),
1439                args: ssh_args,
1440                env: env.clone(),
1441            })
1442        }
1443
1444        fn upload_directory(
1445            &self,
1446            _src_path: PathBuf,
1447            _dest_path: RemotePathBuf,
1448            _cx: &App,
1449        ) -> Task<Result<()>> {
1450            unreachable!()
1451        }
1452
1453        fn connection_options(&self) -> RemoteConnectionOptions {
1454            self.connection_options.clone()
1455        }
1456
1457        fn simulate_disconnect(&self, cx: &AsyncApp) {
1458            let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1459            let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1460            self.server_channel
1461                .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1462        }
1463
1464        fn start_proxy(
1465            &self,
1466            _unique_identifier: String,
1467            _reconnect: bool,
1468            mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1469            mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1470            mut connection_activity_tx: Sender<()>,
1471            _delegate: Arc<dyn RemoteClientDelegate>,
1472            cx: &mut AsyncApp,
1473        ) -> Task<Result<i32>> {
1474            let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1475            let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1476
1477            self.server_channel.reconnect(
1478                server_incoming_rx,
1479                server_outgoing_tx,
1480                &self.server_cx.get(cx),
1481            );
1482
1483            cx.background_spawn(async move {
1484                loop {
1485                    select_biased! {
1486                        server_to_client = server_outgoing_rx.next().fuse() => {
1487                            let Some(server_to_client) = server_to_client else {
1488                                return Ok(1)
1489                            };
1490                            connection_activity_tx.try_send(()).ok();
1491                            client_incoming_tx.send(server_to_client).await.ok();
1492                        }
1493                        client_to_server = client_outgoing_rx.next().fuse() => {
1494                            let Some(client_to_server) = client_to_server else {
1495                                return Ok(1)
1496                            };
1497                            server_incoming_tx.send(client_to_server).await.ok();
1498                        }
1499                    }
1500                }
1501            })
1502        }
1503
1504        fn path_style(&self) -> PathStyle {
1505            PathStyle::current()
1506        }
1507
1508        fn shell(&self) -> String {
1509            "sh".to_owned()
1510        }
1511
1512        fn default_system_shell(&self) -> String {
1513            "sh".to_owned()
1514        }
1515    }
1516
1517    pub(super) struct Delegate;
1518
1519    impl RemoteClientDelegate for Delegate {
1520        fn ask_password(&self, _: String, _: oneshot::Sender<String>, _: &mut AsyncApp) {
1521            unreachable!()
1522        }
1523
1524        fn download_server_binary_locally(
1525            &self,
1526            _: RemotePlatform,
1527            _: ReleaseChannel,
1528            _: Option<SemanticVersion>,
1529            _: &mut AsyncApp,
1530        ) -> Task<Result<PathBuf>> {
1531            unreachable!()
1532        }
1533
1534        fn get_download_params(
1535            &self,
1536            _platform: RemotePlatform,
1537            _release_channel: ReleaseChannel,
1538            _version: Option<SemanticVersion>,
1539            _cx: &mut AsyncApp,
1540        ) -> Task<Result<Option<(String, String)>>> {
1541            unreachable!()
1542        }
1543
1544        fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1545    }
1546}