remote_client.rs

   1use crate::{
   2    SshConnectionOptions,
   3    protocol::MessageId,
   4    proxy::ProxyLaunchError,
   5    transport::{
   6        docker::{DockerConnectionOptions, DockerExecConnection},
   7        ssh::SshRemoteConnection,
   8        wsl::{WslConnectionOptions, WslRemoteConnection},
   9    },
  10};
  11use anyhow::{Context as _, Result, anyhow};
  12use askpass::EncryptedPassword;
  13use async_trait::async_trait;
  14use collections::HashMap;
  15use futures::{
  16    Future, FutureExt as _, StreamExt as _,
  17    channel::{
  18        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  19        oneshot,
  20    },
  21    future::{BoxFuture, Shared},
  22    select, select_biased,
  23};
  24use gpui::{
  25    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  26    EventEmitter, FutureExt, Global, Task, WeakEntity,
  27};
  28use parking_lot::Mutex;
  29
  30use release_channel::ReleaseChannel;
  31use rpc::{
  32    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  33    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  34};
  35use semver::Version;
  36use std::{
  37    collections::VecDeque,
  38    fmt,
  39    ops::ControlFlow,
  40    path::PathBuf,
  41    sync::{
  42        Arc, Weak,
  43        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  44    },
  45    time::{Duration, Instant},
  46};
  47use util::{
  48    ResultExt,
  49    paths::{PathStyle, RemotePathBuf},
  50};
  51
  52#[derive(Copy, Clone, Debug)]
  53pub struct RemotePlatform {
  54    pub os: &'static str,
  55    pub arch: &'static str,
  56}
  57
  58#[derive(Clone, Debug)]
  59pub struct CommandTemplate {
  60    pub program: String,
  61    pub args: Vec<String>,
  62    pub env: HashMap<String, String>,
  63}
  64
  65pub trait RemoteClientDelegate: Send + Sync {
  66    fn ask_password(
  67        &self,
  68        prompt: String,
  69        tx: oneshot::Sender<EncryptedPassword>,
  70        cx: &mut AsyncApp,
  71    );
  72    fn get_download_url(
  73        &self,
  74        platform: RemotePlatform,
  75        release_channel: ReleaseChannel,
  76        version: Option<Version>,
  77        cx: &mut AsyncApp,
  78    ) -> Task<Result<Option<String>>>;
  79    fn download_server_binary_locally(
  80        &self,
  81        platform: RemotePlatform,
  82        release_channel: ReleaseChannel,
  83        version: Option<Version>,
  84        cx: &mut AsyncApp,
  85    ) -> Task<Result<PathBuf>>;
  86    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
  87}
  88
  89const MAX_MISSED_HEARTBEATS: usize = 5;
  90const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
  91const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
  92const INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
  93
  94const MAX_RECONNECT_ATTEMPTS: usize = 3;
  95
  96enum State {
  97    Connecting,
  98    Connected {
  99        remote_connection: Arc<dyn RemoteConnection>,
 100        delegate: Arc<dyn RemoteClientDelegate>,
 101
 102        multiplex_task: Task<Result<()>>,
 103        heartbeat_task: Task<Result<()>>,
 104    },
 105    HeartbeatMissed {
 106        missed_heartbeats: usize,
 107
 108        ssh_connection: Arc<dyn RemoteConnection>,
 109        delegate: Arc<dyn RemoteClientDelegate>,
 110
 111        multiplex_task: Task<Result<()>>,
 112        heartbeat_task: Task<Result<()>>,
 113    },
 114    Reconnecting,
 115    ReconnectFailed {
 116        ssh_connection: Arc<dyn RemoteConnection>,
 117        delegate: Arc<dyn RemoteClientDelegate>,
 118
 119        error: anyhow::Error,
 120        attempts: usize,
 121    },
 122    ReconnectExhausted,
 123    ServerNotRunning,
 124}
 125
 126impl fmt::Display for State {
 127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 128        match self {
 129            Self::Connecting => write!(f, "connecting"),
 130            Self::Connected { .. } => write!(f, "connected"),
 131            Self::Reconnecting => write!(f, "reconnecting"),
 132            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 133            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 134            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 135            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 136        }
 137    }
 138}
 139
 140impl State {
 141    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 142        match self {
 143            Self::Connected {
 144                remote_connection: ssh_connection,
 145                ..
 146            } => Some(ssh_connection.clone()),
 147            Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.clone()),
 148            Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.clone()),
 149            _ => None,
 150        }
 151    }
 152
 153    fn can_reconnect(&self) -> bool {
 154        match self {
 155            Self::Connected { .. }
 156            | Self::HeartbeatMissed { .. }
 157            | Self::ReconnectFailed { .. } => true,
 158            State::Connecting
 159            | State::Reconnecting
 160            | State::ReconnectExhausted
 161            | State::ServerNotRunning => false,
 162        }
 163    }
 164
 165    fn is_reconnect_failed(&self) -> bool {
 166        matches!(self, Self::ReconnectFailed { .. })
 167    }
 168
 169    fn is_reconnect_exhausted(&self) -> bool {
 170        matches!(self, Self::ReconnectExhausted { .. })
 171    }
 172
 173    fn is_server_not_running(&self) -> bool {
 174        matches!(self, Self::ServerNotRunning)
 175    }
 176
 177    fn is_reconnecting(&self) -> bool {
 178        matches!(self, Self::Reconnecting { .. })
 179    }
 180
 181    fn heartbeat_recovered(self) -> Self {
 182        match self {
 183            Self::HeartbeatMissed {
 184                ssh_connection,
 185                delegate,
 186                multiplex_task,
 187                heartbeat_task,
 188                ..
 189            } => Self::Connected {
 190                remote_connection: ssh_connection,
 191                delegate,
 192                multiplex_task,
 193                heartbeat_task,
 194            },
 195            _ => self,
 196        }
 197    }
 198
 199    fn heartbeat_missed(self) -> Self {
 200        match self {
 201            Self::Connected {
 202                remote_connection: ssh_connection,
 203                delegate,
 204                multiplex_task,
 205                heartbeat_task,
 206            } => Self::HeartbeatMissed {
 207                missed_heartbeats: 1,
 208                ssh_connection,
 209                delegate,
 210                multiplex_task,
 211                heartbeat_task,
 212            },
 213            Self::HeartbeatMissed {
 214                missed_heartbeats,
 215                ssh_connection,
 216                delegate,
 217                multiplex_task,
 218                heartbeat_task,
 219            } => Self::HeartbeatMissed {
 220                missed_heartbeats: missed_heartbeats + 1,
 221                ssh_connection,
 222                delegate,
 223                multiplex_task,
 224                heartbeat_task,
 225            },
 226            _ => self,
 227        }
 228    }
 229}
 230
 231/// The state of the ssh connection.
 232#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 233pub enum ConnectionState {
 234    Connecting,
 235    Connected,
 236    HeartbeatMissed,
 237    Reconnecting,
 238    Disconnected,
 239}
 240
 241impl From<&State> for ConnectionState {
 242    fn from(value: &State) -> Self {
 243        match value {
 244            State::Connecting => Self::Connecting,
 245            State::Connected { .. } => Self::Connected,
 246            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 247            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 248            State::ReconnectExhausted => Self::Disconnected,
 249            State::ServerNotRunning => Self::Disconnected,
 250        }
 251    }
 252}
 253
 254pub struct RemoteClient {
 255    client: Arc<ChannelClient>,
 256    unique_identifier: String,
 257    connection_options: RemoteConnectionOptions,
 258    path_style: PathStyle,
 259    state: Option<State>,
 260}
 261
 262#[derive(Debug)]
 263pub enum RemoteClientEvent {
 264    Disconnected,
 265}
 266
 267impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 268
 269/// Identifies the socket on the remote server so that reconnects
 270/// can re-join the same project.
 271pub enum ConnectionIdentifier {
 272    Setup(u64),
 273    Workspace(i64),
 274}
 275
 276static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 277
 278impl ConnectionIdentifier {
 279    pub fn setup() -> Self {
 280        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 281    }
 282
 283    // This string gets used in a socket name, and so must be relatively short.
 284    // The total length of:
 285    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 286    // Must be less than about 100 characters
 287    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 288    // So our strings should be at most 20 characters or so.
 289    fn to_string(&self, cx: &App) -> String {
 290        let identifier_prefix = match ReleaseChannel::global(cx) {
 291            ReleaseChannel::Stable => "".to_string(),
 292            release_channel => format!("{}-", release_channel.dev_name()),
 293        };
 294        match self {
 295            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 296            Self::Workspace(workspace_id) => {
 297                format!("{identifier_prefix}workspace-{workspace_id}",)
 298            }
 299        }
 300    }
 301}
 302
 303pub async fn connect(
 304    connection_options: RemoteConnectionOptions,
 305    delegate: Arc<dyn RemoteClientDelegate>,
 306    cx: &mut AsyncApp,
 307) -> Result<Arc<dyn RemoteConnection>> {
 308    cx.update(|cx| {
 309        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 310            pool.connect(connection_options.clone(), delegate.clone(), cx)
 311        })
 312    })?
 313    .await
 314    .map_err(|e| e.cloned())
 315}
 316
 317impl RemoteClient {
 318    pub fn new(
 319        unique_identifier: ConnectionIdentifier,
 320        remote_connection: Arc<dyn RemoteConnection>,
 321        cancellation: oneshot::Receiver<()>,
 322        delegate: Arc<dyn RemoteClientDelegate>,
 323        cx: &mut App,
 324    ) -> Task<Result<Option<Entity<Self>>>> {
 325        let unique_identifier = unique_identifier.to_string(cx);
 326        cx.spawn(async move |cx| {
 327            let success = Box::pin(async move {
 328                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 329                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 330                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 331
 332                let client = cx.update(|cx| {
 333                    ChannelClient::new(
 334                        incoming_rx,
 335                        outgoing_tx,
 336                        cx,
 337                        "client",
 338                        remote_connection.has_wsl_interop(),
 339                    )
 340                })?;
 341
 342                let path_style = remote_connection.path_style();
 343                let this = cx.new(|_| Self {
 344                    client: client.clone(),
 345                    unique_identifier: unique_identifier.clone(),
 346                    connection_options: remote_connection.connection_options(),
 347                    path_style,
 348                    state: Some(State::Connecting),
 349                })?;
 350
 351                let io_task = remote_connection.start_proxy(
 352                    unique_identifier,
 353                    false,
 354                    incoming_tx,
 355                    outgoing_rx,
 356                    connection_activity_tx,
 357                    delegate.clone(),
 358                    cx,
 359                );
 360
 361                let ready = client
 362                    .wait_for_remote_started()
 363                    .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
 364                    .await;
 365                match ready {
 366                    Ok(Some(_)) => {}
 367                    Ok(None) => {
 368                        let mut error = "remote client exited before becoming ready".to_owned();
 369                        if let Some(status) = io_task.now_or_never() {
 370                            match status {
 371                                Ok(exit_code) => {
 372                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 373                                }
 374                                Err(e) => error.push_str(&format!(", error={e:?}")),
 375                            }
 376                        }
 377                        let error = anyhow::anyhow!("{error}");
 378                        log::error!("failed to establish connection: {}", error);
 379                        return Err(error);
 380                    }
 381                    Err(_) => {
 382                        let mut error =
 383                            "remote client did not become ready within the timeout".to_owned();
 384                        if let Some(status) = io_task.now_or_never() {
 385                            match status {
 386                                Ok(exit_code) => {
 387                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 388                                }
 389                                Err(e) => error.push_str(&format!(", error={e:?}")),
 390                            }
 391                        }
 392                        let error = anyhow::anyhow!("{error}");
 393                        log::error!("failed to establish connection: {}", error);
 394                        return Err(error);
 395                    }
 396                }
 397                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 398                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 399                    log::error!("failed to establish connection: {}", error);
 400                    return Err(error);
 401                }
 402
 403                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 404
 405                this.update(cx, |this, _| {
 406                    this.state = Some(State::Connected {
 407                        remote_connection,
 408                        delegate,
 409                        multiplex_task,
 410                        heartbeat_task,
 411                    });
 412                })?;
 413
 414                Ok(Some(this))
 415            });
 416
 417            select! {
 418                _ = cancellation.fuse() => {
 419                    Ok(None)
 420                }
 421                result = success.fuse() =>  result
 422            }
 423        })
 424    }
 425
 426    pub fn proto_client_from_channels(
 427        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 428        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 429        cx: &App,
 430        name: &'static str,
 431        has_wsl_interop: bool,
 432    ) -> AnyProtoClient {
 433        ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
 434    }
 435
 436    pub fn shutdown_processes<T: RequestMessage>(
 437        &mut self,
 438        shutdown_request: Option<T>,
 439        executor: BackgroundExecutor,
 440    ) -> Option<impl Future<Output = ()> + use<T>> {
 441        let state = self.state.take()?;
 442        log::info!("shutting down ssh processes");
 443
 444        let State::Connected {
 445            multiplex_task,
 446            heartbeat_task,
 447            remote_connection: ssh_connection,
 448            delegate,
 449        } = state
 450        else {
 451            return None;
 452        };
 453
 454        let client = self.client.clone();
 455
 456        Some(async move {
 457            if let Some(shutdown_request) = shutdown_request {
 458                client.send(shutdown_request).log_err();
 459                // We wait 50ms instead of waiting for a response, because
 460                // waiting for a response would require us to wait on the main thread
 461                // which we want to avoid in an `on_app_quit` callback.
 462                executor.timer(Duration::from_millis(50)).await;
 463            }
 464
 465            // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
 466            // child of master_process.
 467            drop(multiplex_task);
 468            // Now drop the rest of state, which kills master process.
 469            drop(heartbeat_task);
 470            drop(ssh_connection);
 471            drop(delegate);
 472        })
 473    }
 474
 475    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 476        let can_reconnect = self
 477            .state
 478            .as_ref()
 479            .map(|state| state.can_reconnect())
 480            .unwrap_or(false);
 481        if !can_reconnect {
 482            log::info!("aborting reconnect, because not in state that allows reconnecting");
 483            let error = if let Some(state) = self.state.as_ref() {
 484                format!("invalid state, cannot reconnect while in state {state}")
 485            } else {
 486                "no state set".to_string()
 487            };
 488            anyhow::bail!(error);
 489        }
 490
 491        let state = self.state.take().unwrap();
 492        let (attempts, remote_connection, delegate) = match state {
 493            State::Connected {
 494                remote_connection: ssh_connection,
 495                delegate,
 496                multiplex_task,
 497                heartbeat_task,
 498            }
 499            | State::HeartbeatMissed {
 500                ssh_connection,
 501                delegate,
 502                multiplex_task,
 503                heartbeat_task,
 504                ..
 505            } => {
 506                drop(multiplex_task);
 507                drop(heartbeat_task);
 508                (0, ssh_connection, delegate)
 509            }
 510            State::ReconnectFailed {
 511                attempts,
 512                ssh_connection,
 513                delegate,
 514                ..
 515            } => (attempts, ssh_connection, delegate),
 516            State::Connecting
 517            | State::Reconnecting
 518            | State::ReconnectExhausted
 519            | State::ServerNotRunning => unreachable!(),
 520        };
 521
 522        let attempts = attempts + 1;
 523        if attempts > MAX_RECONNECT_ATTEMPTS {
 524            log::error!(
 525                "Failed to reconnect to after {} attempts, giving up",
 526                MAX_RECONNECT_ATTEMPTS
 527            );
 528            self.set_state(State::ReconnectExhausted, cx);
 529            return Ok(());
 530        }
 531
 532        self.set_state(State::Reconnecting, cx);
 533
 534        log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
 535
 536        let unique_identifier = self.unique_identifier.clone();
 537        let client = self.client.clone();
 538        let reconnect_task = cx.spawn(async move |this, cx| {
 539            macro_rules! failed {
 540                ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr) => {
 541                    delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
 542                    return State::ReconnectFailed {
 543                        error: anyhow!($error),
 544                        attempts: $attempts,
 545                        ssh_connection: $ssh_connection,
 546                        delegate: $delegate,
 547                    };
 548                };
 549            }
 550
 551            if let Err(error) = remote_connection
 552                .kill()
 553                .await
 554                .context("Failed to kill ssh process")
 555            {
 556                failed!(error, attempts, remote_connection, delegate);
 557            };
 558
 559            let connection_options = remote_connection.connection_options();
 560
 561            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 562            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 563            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 564
 565            let (ssh_connection, io_task) = match async {
 566                let ssh_connection = cx
 567                    .update_global(|pool: &mut ConnectionPool, cx| {
 568                        pool.connect(connection_options, delegate.clone(), cx)
 569                    })?
 570                    .await
 571                    .map_err(|error| error.cloned())?;
 572
 573                let io_task = ssh_connection.start_proxy(
 574                    unique_identifier,
 575                    true,
 576                    incoming_tx,
 577                    outgoing_rx,
 578                    connection_activity_tx,
 579                    delegate.clone(),
 580                    cx,
 581                );
 582                anyhow::Ok((ssh_connection, io_task))
 583            }
 584            .await
 585            {
 586                Ok((ssh_connection, io_task)) => (ssh_connection, io_task),
 587                Err(error) => {
 588                    failed!(error, attempts, remote_connection, delegate);
 589                }
 590            };
 591
 592            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 593            client.reconnect(incoming_rx, outgoing_tx, cx);
 594
 595            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 596                failed!(error, attempts, ssh_connection, delegate);
 597            };
 598
 599            State::Connected {
 600                remote_connection: ssh_connection,
 601                delegate,
 602                multiplex_task,
 603                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 604            }
 605        });
 606
 607        cx.spawn(async move |this, cx| {
 608            let new_state = reconnect_task.await;
 609            this.update(cx, |this, cx| {
 610                this.try_set_state(cx, |old_state| {
 611                    if old_state.is_reconnecting() {
 612                        match &new_state {
 613                            State::Connecting
 614                            | State::Reconnecting
 615                            | State::HeartbeatMissed { .. }
 616                            | State::ServerNotRunning => {}
 617                            State::Connected { .. } => {
 618                                log::info!("Successfully reconnected");
 619                            }
 620                            State::ReconnectFailed {
 621                                error, attempts, ..
 622                            } => {
 623                                log::error!(
 624                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 625                                    attempts,
 626                                    error
 627                                );
 628                            }
 629                            State::ReconnectExhausted => {
 630                                log::error!("Reconnect attempt failed and all attempts exhausted");
 631                            }
 632                        }
 633                        Some(new_state)
 634                    } else {
 635                        None
 636                    }
 637                });
 638
 639                if this.state_is(State::is_reconnect_failed) {
 640                    this.reconnect(cx)
 641                } else if this.state_is(State::is_reconnect_exhausted) {
 642                    Ok(())
 643                } else {
 644                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 645                    Ok(())
 646                }
 647            })
 648        })
 649        .detach_and_log_err(cx);
 650
 651        Ok(())
 652    }
 653
 654    fn heartbeat(
 655        this: WeakEntity<Self>,
 656        mut connection_activity_rx: mpsc::Receiver<()>,
 657        cx: &mut AsyncApp,
 658    ) -> Task<Result<()>> {
 659        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 660            return Task::ready(Err(anyhow!("SshRemoteClient lost")));
 661        };
 662
 663        cx.spawn(async move |cx| {
 664            let mut missed_heartbeats = 0;
 665
 666            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 667            futures::pin_mut!(keepalive_timer);
 668
 669            loop {
 670                select_biased! {
 671                    result = connection_activity_rx.next().fuse() => {
 672                        if result.is_none() {
 673                            log::warn!("ssh heartbeat: connection activity channel has been dropped. stopping.");
 674                            return Ok(());
 675                        }
 676
 677                        if missed_heartbeats != 0 {
 678                            missed_heartbeats = 0;
 679                            let _ =this.update(cx, |this, cx| {
 680                                this.handle_heartbeat_result(missed_heartbeats, cx)
 681                            })?;
 682                        }
 683                    }
 684                    _ = keepalive_timer => {
 685                        log::debug!("Sending heartbeat to server...");
 686
 687                        let result = select_biased! {
 688                            _ = connection_activity_rx.next().fuse() => {
 689                                Ok(())
 690                            }
 691                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 692                                ping_result
 693                            }
 694                        };
 695
 696                        if result.is_err() {
 697                            missed_heartbeats += 1;
 698                            log::warn!(
 699                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 700                                HEARTBEAT_TIMEOUT,
 701                                missed_heartbeats,
 702                                MAX_MISSED_HEARTBEATS
 703                            );
 704                        } else if missed_heartbeats != 0 {
 705                            missed_heartbeats = 0;
 706                        } else {
 707                            continue;
 708                        }
 709
 710                        let result = this.update(cx, |this, cx| {
 711                            this.handle_heartbeat_result(missed_heartbeats, cx)
 712                        })?;
 713                        if result.is_break() {
 714                            return Ok(());
 715                        }
 716                    }
 717                }
 718
 719                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 720            }
 721        })
 722    }
 723
 724    fn handle_heartbeat_result(
 725        &mut self,
 726        missed_heartbeats: usize,
 727        cx: &mut Context<Self>,
 728    ) -> ControlFlow<()> {
 729        let state = self.state.take().unwrap();
 730        let next_state = if missed_heartbeats > 0 {
 731            state.heartbeat_missed()
 732        } else {
 733            state.heartbeat_recovered()
 734        };
 735
 736        self.set_state(next_state, cx);
 737
 738        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 739            log::error!(
 740                "Missed last {} heartbeats. Reconnecting...",
 741                missed_heartbeats
 742            );
 743
 744            self.reconnect(cx)
 745                .context("failed to start reconnect process after missing heartbeats")
 746                .log_err();
 747            ControlFlow::Break(())
 748        } else {
 749            ControlFlow::Continue(())
 750        }
 751    }
 752
 753    fn monitor(
 754        this: WeakEntity<Self>,
 755        io_task: Task<Result<i32>>,
 756        cx: &AsyncApp,
 757    ) -> Task<Result<()>> {
 758        cx.spawn(async move |cx| {
 759            let result = io_task.await;
 760
 761            match result {
 762                Ok(exit_code) => {
 763                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 764                        match error {
 765                            ProxyLaunchError::ServerNotRunning => {
 766                                log::error!("failed to reconnect because server is not running");
 767                                this.update(cx, |this, cx| {
 768                                    this.set_state(State::ServerNotRunning, cx);
 769                                })?;
 770                            }
 771                        }
 772                    } else if exit_code > 0 {
 773                        log::error!("proxy process terminated unexpectedly");
 774                        this.update(cx, |this, cx| {
 775                            this.reconnect(cx).ok();
 776                        })?;
 777                    }
 778                }
 779                Err(error) => {
 780                    log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
 781                    this.update(cx, |this, cx| {
 782                        this.reconnect(cx).ok();
 783                    })?;
 784                }
 785            }
 786
 787            Ok(())
 788        })
 789    }
 790
 791    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 792        self.state.as_ref().is_some_and(check)
 793    }
 794
 795    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 796        let new_state = self.state.as_ref().and_then(map);
 797        if let Some(new_state) = new_state {
 798            self.state.replace(new_state);
 799            cx.notify();
 800        }
 801    }
 802
 803    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 804        log::info!("setting state to '{}'", &state);
 805
 806        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 807        let is_server_not_running = state.is_server_not_running();
 808        self.state.replace(state);
 809
 810        if is_reconnect_exhausted || is_server_not_running {
 811            cx.emit(RemoteClientEvent::Disconnected);
 812        }
 813        cx.notify();
 814    }
 815
 816    pub fn shell(&self) -> Option<String> {
 817        Some(self.remote_connection()?.shell())
 818    }
 819
 820    pub fn default_system_shell(&self) -> Option<String> {
 821        Some(self.remote_connection()?.default_system_shell())
 822    }
 823
 824    pub fn shares_network_interface(&self) -> bool {
 825        self.remote_connection()
 826            .map_or(false, |connection| connection.shares_network_interface())
 827    }
 828
 829    pub fn build_command(
 830        &self,
 831        program: Option<String>,
 832        args: &[String],
 833        env: &HashMap<String, String>,
 834        working_dir: Option<String>,
 835        port_forward: Option<(u16, String, u16)>,
 836    ) -> Result<CommandTemplate> {
 837        let Some(connection) = self.remote_connection() else {
 838            return Err(anyhow!("no ssh connection"));
 839        };
 840        connection.build_command(program, args, env, working_dir, port_forward)
 841    }
 842
 843    pub fn build_forward_ports_command(
 844        &self,
 845        forwards: Vec<(u16, String, u16)>,
 846    ) -> Result<CommandTemplate> {
 847        let Some(connection) = self.remote_connection() else {
 848            return Err(anyhow!("no ssh connection"));
 849        };
 850        connection.build_forward_ports_command(forwards)
 851    }
 852
 853    pub fn upload_directory(
 854        &self,
 855        src_path: PathBuf,
 856        dest_path: RemotePathBuf,
 857        cx: &App,
 858    ) -> Task<Result<()>> {
 859        let Some(connection) = self.remote_connection() else {
 860            return Task::ready(Err(anyhow!("no ssh connection")));
 861        };
 862        connection.upload_directory(src_path, dest_path, cx)
 863    }
 864
 865    pub fn proto_client(&self) -> AnyProtoClient {
 866        self.client.clone().into()
 867    }
 868
 869    pub fn connection_options(&self) -> RemoteConnectionOptions {
 870        self.connection_options.clone()
 871    }
 872
 873    pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 874        if let State::Connected {
 875            remote_connection, ..
 876        } = self.state.as_ref()?
 877        {
 878            Some(remote_connection.clone())
 879        } else {
 880            None
 881        }
 882    }
 883
 884    pub fn connection_state(&self) -> ConnectionState {
 885        self.state
 886            .as_ref()
 887            .map(ConnectionState::from)
 888            .unwrap_or(ConnectionState::Disconnected)
 889    }
 890
 891    pub fn is_disconnected(&self) -> bool {
 892        self.connection_state() == ConnectionState::Disconnected
 893    }
 894
 895    pub fn path_style(&self) -> PathStyle {
 896        self.path_style
 897    }
 898
 899    #[cfg(any(test, feature = "test-support"))]
 900    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
 901        let opts = self.connection_options();
 902        client_cx.spawn(async move |cx| {
 903            let connection = cx
 904                .update_global(|c: &mut ConnectionPool, _| {
 905                    if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
 906                        c.clone()
 907                    } else {
 908                        panic!("missing test connection")
 909                    }
 910                })
 911                .unwrap()
 912                .await
 913                .unwrap();
 914
 915            connection.simulate_disconnect(cx);
 916        })
 917    }
 918
 919    #[cfg(any(test, feature = "test-support"))]
 920    pub fn fake_server(
 921        client_cx: &mut gpui::TestAppContext,
 922        server_cx: &mut gpui::TestAppContext,
 923    ) -> (RemoteConnectionOptions, AnyProtoClient) {
 924        let port = client_cx
 925            .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
 926        let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
 927            host: "<fake>".to_string(),
 928            port: Some(port),
 929            ..Default::default()
 930        });
 931        let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
 932        let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
 933        let server_client = server_cx
 934            .update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server", false));
 935        let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
 936            connection_options: opts.clone(),
 937            server_cx: fake::SendableCx::new(server_cx),
 938            server_channel: server_client.clone(),
 939        });
 940
 941        client_cx.update(|cx| {
 942            cx.update_default_global(|c: &mut ConnectionPool, cx| {
 943                c.connections.insert(
 944                    opts.clone(),
 945                    ConnectionPoolEntry::Connecting(
 946                        cx.background_spawn({
 947                            let connection = connection.clone();
 948                            async move { Ok(connection.clone()) }
 949                        })
 950                        .shared(),
 951                    ),
 952                );
 953            })
 954        });
 955
 956        (opts, server_client.into())
 957    }
 958
 959    #[cfg(any(test, feature = "test-support"))]
 960    pub async fn fake_client(
 961        opts: RemoteConnectionOptions,
 962        client_cx: &mut gpui::TestAppContext,
 963    ) -> Entity<Self> {
 964        let (_tx, rx) = oneshot::channel();
 965        let mut cx = client_cx.to_async();
 966        let connection = connect(opts, Arc::new(fake::Delegate), &mut cx)
 967            .await
 968            .unwrap();
 969        client_cx
 970            .update(|cx| {
 971                Self::new(
 972                    ConnectionIdentifier::setup(),
 973                    connection,
 974                    rx,
 975                    Arc::new(fake::Delegate),
 976                    cx,
 977                )
 978            })
 979            .await
 980            .unwrap()
 981            .unwrap()
 982    }
 983
 984    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 985        self.state
 986            .as_ref()
 987            .and_then(|state| state.remote_connection())
 988    }
 989}
 990
 991enum ConnectionPoolEntry {
 992    Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
 993    Connected(Weak<dyn RemoteConnection>),
 994}
 995
 996#[derive(Default)]
 997struct ConnectionPool {
 998    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
 999}
1000
1001impl Global for ConnectionPool {}
1002
1003impl ConnectionPool {
1004    pub fn connect(
1005        &mut self,
1006        opts: RemoteConnectionOptions,
1007        delegate: Arc<dyn RemoteClientDelegate>,
1008        cx: &mut App,
1009    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1010        let connection = self.connections.get(&opts);
1011        match connection {
1012            Some(ConnectionPoolEntry::Connecting(task)) => {
1013                delegate.set_status(
1014                    Some("Waiting for existing connection attempt"),
1015                    &mut cx.to_async(),
1016                );
1017                return task.clone();
1018            }
1019            Some(ConnectionPoolEntry::Connected(ssh)) => {
1020                if let Some(ssh) = ssh.upgrade()
1021                    && !ssh.has_been_killed()
1022                {
1023                    return Task::ready(Ok(ssh)).shared();
1024                }
1025                self.connections.remove(&opts);
1026            }
1027            None => {}
1028        }
1029
1030        let task = cx
1031            .spawn({
1032                let opts = opts.clone();
1033                let delegate = delegate.clone();
1034                async move |cx| {
1035                    let connection = match opts.clone() {
1036                        RemoteConnectionOptions::Ssh(opts) => {
1037                            SshRemoteConnection::new(opts, delegate, cx)
1038                                .await
1039                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1040                        }
1041                        RemoteConnectionOptions::Wsl(opts) => {
1042                            WslRemoteConnection::new(opts, delegate, cx)
1043                                .await
1044                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1045                        }
1046                        RemoteConnectionOptions::Docker(opts) => {
1047                            DockerExecConnection::new(opts, delegate, cx)
1048                                .await
1049                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1050                        }
1051                    };
1052
1053                    cx.update_global(|pool: &mut Self, _| {
1054                        debug_assert!(matches!(
1055                            pool.connections.get(&opts),
1056                            Some(ConnectionPoolEntry::Connecting(_))
1057                        ));
1058                        match connection {
1059                            Ok(connection) => {
1060                                pool.connections.insert(
1061                                    opts.clone(),
1062                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1063                                );
1064                                Ok(connection)
1065                            }
1066                            Err(error) => {
1067                                pool.connections.remove(&opts);
1068                                Err(Arc::new(error))
1069                            }
1070                        }
1071                    })?
1072                }
1073            })
1074            .shared();
1075
1076        self.connections
1077            .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1078        task
1079    }
1080}
1081
1082#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1083pub enum RemoteConnectionOptions {
1084    Ssh(SshConnectionOptions),
1085    Wsl(WslConnectionOptions),
1086    Docker(DockerConnectionOptions),
1087}
1088
1089impl RemoteConnectionOptions {
1090    pub fn display_name(&self) -> String {
1091        match self {
1092            RemoteConnectionOptions::Ssh(opts) => opts.host.clone(),
1093            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1094            RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1095        }
1096    }
1097}
1098
1099impl From<SshConnectionOptions> for RemoteConnectionOptions {
1100    fn from(opts: SshConnectionOptions) -> Self {
1101        RemoteConnectionOptions::Ssh(opts)
1102    }
1103}
1104
1105impl From<WslConnectionOptions> for RemoteConnectionOptions {
1106    fn from(opts: WslConnectionOptions) -> Self {
1107        RemoteConnectionOptions::Wsl(opts)
1108    }
1109}
1110
1111#[cfg(target_os = "windows")]
1112/// Open a wsl path (\\wsl.localhost\<distro>\path)
1113#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1114#[action(namespace = workspace, no_json, no_register)]
1115pub struct OpenWslPath {
1116    pub distro: WslConnectionOptions,
1117    pub paths: Vec<PathBuf>,
1118}
1119
1120#[async_trait(?Send)]
1121pub trait RemoteConnection: Send + Sync {
1122    fn start_proxy(
1123        &self,
1124        unique_identifier: String,
1125        reconnect: bool,
1126        incoming_tx: UnboundedSender<Envelope>,
1127        outgoing_rx: UnboundedReceiver<Envelope>,
1128        connection_activity_tx: Sender<()>,
1129        delegate: Arc<dyn RemoteClientDelegate>,
1130        cx: &mut AsyncApp,
1131    ) -> Task<Result<i32>>;
1132    fn upload_directory(
1133        &self,
1134        src_path: PathBuf,
1135        dest_path: RemotePathBuf,
1136        cx: &App,
1137    ) -> Task<Result<()>>;
1138    async fn kill(&self) -> Result<()>;
1139    fn has_been_killed(&self) -> bool;
1140    fn shares_network_interface(&self) -> bool {
1141        false
1142    }
1143    fn build_command(
1144        &self,
1145        program: Option<String>,
1146        args: &[String],
1147        env: &HashMap<String, String>,
1148        working_dir: Option<String>,
1149        port_forward: Option<(u16, String, u16)>,
1150    ) -> Result<CommandTemplate>;
1151    fn build_forward_ports_command(
1152        &self,
1153        forwards: Vec<(u16, String, u16)>,
1154    ) -> Result<CommandTemplate>;
1155    fn connection_options(&self) -> RemoteConnectionOptions;
1156    fn path_style(&self) -> PathStyle;
1157    fn shell(&self) -> String;
1158    fn default_system_shell(&self) -> String;
1159    fn has_wsl_interop(&self) -> bool;
1160
1161    #[cfg(any(test, feature = "test-support"))]
1162    fn simulate_disconnect(&self, _: &AsyncApp) {}
1163}
1164
1165type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1166
1167struct Signal<T> {
1168    tx: Mutex<Option<oneshot::Sender<T>>>,
1169    rx: Shared<Task<Option<T>>>,
1170}
1171
1172impl<T: Send + Clone + 'static> Signal<T> {
1173    pub fn new(cx: &App) -> Self {
1174        let (tx, rx) = oneshot::channel();
1175
1176        let task = cx
1177            .background_executor()
1178            .spawn(async move { rx.await.ok() })
1179            .shared();
1180
1181        Self {
1182            tx: Mutex::new(Some(tx)),
1183            rx: task,
1184        }
1185    }
1186
1187    fn set(&self, value: T) {
1188        if let Some(tx) = self.tx.lock().take() {
1189            let _ = tx.send(value);
1190        }
1191    }
1192
1193    fn wait(&self) -> Shared<Task<Option<T>>> {
1194        self.rx.clone()
1195    }
1196}
1197
1198struct ChannelClient {
1199    next_message_id: AtomicU32,
1200    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1201    buffer: Mutex<VecDeque<Envelope>>,
1202    response_channels: ResponseChannels,
1203    message_handlers: Mutex<ProtoMessageHandlerSet>,
1204    max_received: AtomicU32,
1205    name: &'static str,
1206    task: Mutex<Task<Result<()>>>,
1207    remote_started: Signal<()>,
1208    has_wsl_interop: bool,
1209}
1210
1211impl ChannelClient {
1212    fn new(
1213        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1214        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1215        cx: &App,
1216        name: &'static str,
1217        has_wsl_interop: bool,
1218    ) -> Arc<Self> {
1219        Arc::new_cyclic(|this| Self {
1220            outgoing_tx: Mutex::new(outgoing_tx),
1221            next_message_id: AtomicU32::new(0),
1222            max_received: AtomicU32::new(0),
1223            response_channels: ResponseChannels::default(),
1224            message_handlers: Default::default(),
1225            buffer: Mutex::new(VecDeque::new()),
1226            name,
1227            task: Mutex::new(Self::start_handling_messages(
1228                this.clone(),
1229                incoming_rx,
1230                &cx.to_async(),
1231            )),
1232            remote_started: Signal::new(cx),
1233            has_wsl_interop,
1234        })
1235    }
1236
1237    fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1238        self.remote_started.wait()
1239    }
1240
1241    fn start_handling_messages(
1242        this: Weak<Self>,
1243        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1244        cx: &AsyncApp,
1245    ) -> Task<Result<()>> {
1246        cx.spawn(async move |cx| {
1247            if let Some(this) = this.upgrade() {
1248                let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1249                this.outgoing_tx.lock().unbounded_send(envelope).ok();
1250            };
1251
1252            let peer_id = PeerId { owner_id: 0, id: 0 };
1253            while let Some(incoming) = incoming_rx.next().await {
1254                let Some(this) = this.upgrade() else {
1255                    return anyhow::Ok(());
1256                };
1257                if let Some(ack_id) = incoming.ack_id {
1258                    let mut buffer = this.buffer.lock();
1259                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1260                        buffer.pop_front();
1261                    }
1262                }
1263                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1264                {
1265                    log::debug!(
1266                        "{}:ssh message received. name:FlushBufferedMessages",
1267                        this.name
1268                    );
1269                    {
1270                        let buffer = this.buffer.lock();
1271                        for envelope in buffer.iter() {
1272                            this.outgoing_tx
1273                                .lock()
1274                                .unbounded_send(envelope.clone())
1275                                .ok();
1276                        }
1277                    }
1278                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1279                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1280                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1281                    continue;
1282                }
1283
1284                if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1285                    this.remote_started.set(());
1286                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1287                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1288                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1289                    continue;
1290                }
1291
1292                this.max_received.store(incoming.id, SeqCst);
1293
1294                if let Some(request_id) = incoming.responding_to {
1295                    let request_id = MessageId(request_id);
1296                    let sender = this.response_channels.lock().remove(&request_id);
1297                    if let Some(sender) = sender {
1298                        let (tx, rx) = oneshot::channel();
1299                        if incoming.payload.is_some() {
1300                            sender.send((incoming, tx)).ok();
1301                        }
1302                        rx.await.ok();
1303                    }
1304                } else if let Some(envelope) =
1305                    build_typed_envelope(peer_id, Instant::now(), incoming)
1306                {
1307                    let type_name = envelope.payload_type_name();
1308                    let message_id = envelope.message_id();
1309                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1310                        &this.message_handlers,
1311                        envelope,
1312                        this.clone().into(),
1313                        cx.clone(),
1314                    ) {
1315                        log::debug!("{}:ssh message received. name:{type_name}", this.name);
1316                        cx.foreground_executor()
1317                            .spawn(async move {
1318                                match future.await {
1319                                    Ok(_) => {
1320                                        log::debug!(
1321                                            "{}:ssh message handled. name:{type_name}",
1322                                            this.name
1323                                        );
1324                                    }
1325                                    Err(error) => {
1326                                        log::error!(
1327                                            "{}:error handling message. type:{}, error:{:#}",
1328                                            this.name,
1329                                            type_name,
1330                                            format!("{error:#}").lines().fold(
1331                                                String::new(),
1332                                                |mut message, line| {
1333                                                    if !message.is_empty() {
1334                                                        message.push(' ');
1335                                                    }
1336                                                    message.push_str(line);
1337                                                    message
1338                                                }
1339                                            )
1340                                        );
1341                                    }
1342                                }
1343                            })
1344                            .detach()
1345                    } else {
1346                        log::error!("{}:unhandled ssh message name:{type_name}", this.name);
1347                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1348                            message_id,
1349                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1350                        ) {
1351                            log::error!(
1352                                "{}:error sending error response for {type_name}:{e:#}",
1353                                this.name
1354                            );
1355                        }
1356                    }
1357                }
1358            }
1359            anyhow::Ok(())
1360        })
1361    }
1362
1363    fn reconnect(
1364        self: &Arc<Self>,
1365        incoming_rx: UnboundedReceiver<Envelope>,
1366        outgoing_tx: UnboundedSender<Envelope>,
1367        cx: &AsyncApp,
1368    ) {
1369        *self.outgoing_tx.lock() = outgoing_tx;
1370        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1371    }
1372
1373    fn request<T: RequestMessage>(
1374        &self,
1375        payload: T,
1376    ) -> impl 'static + Future<Output = Result<T::Response>> {
1377        self.request_internal(payload, true)
1378    }
1379
1380    fn request_internal<T: RequestMessage>(
1381        &self,
1382        payload: T,
1383        use_buffer: bool,
1384    ) -> impl 'static + Future<Output = Result<T::Response>> {
1385        log::debug!("ssh request start. name:{}", T::NAME);
1386        let response =
1387            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1388        async move {
1389            let response = response.await?;
1390            log::debug!("ssh request finish. name:{}", T::NAME);
1391            T::Response::from_envelope(response).context("received a response of the wrong type")
1392        }
1393    }
1394
1395    async fn resync(&self, timeout: Duration) -> Result<()> {
1396        smol::future::or(
1397            async {
1398                self.request_internal(proto::FlushBufferedMessages {}, false)
1399                    .await?;
1400
1401                for envelope in self.buffer.lock().iter() {
1402                    self.outgoing_tx
1403                        .lock()
1404                        .unbounded_send(envelope.clone())
1405                        .ok();
1406                }
1407                Ok(())
1408            },
1409            async {
1410                smol::Timer::after(timeout).await;
1411                anyhow::bail!("Timed out resyncing remote client")
1412            },
1413        )
1414        .await
1415    }
1416
1417    async fn ping(&self, timeout: Duration) -> Result<()> {
1418        smol::future::or(
1419            async {
1420                self.request(proto::Ping {}).await?;
1421                Ok(())
1422            },
1423            async {
1424                smol::Timer::after(timeout).await;
1425                anyhow::bail!("Timed out pinging remote client")
1426            },
1427        )
1428        .await
1429    }
1430
1431    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1432        log::debug!("ssh send name:{}", T::NAME);
1433        self.send_dynamic(payload.into_envelope(0, None, None))
1434    }
1435
1436    fn request_dynamic(
1437        &self,
1438        mut envelope: proto::Envelope,
1439        type_name: &'static str,
1440        use_buffer: bool,
1441    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1442        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1443        let (tx, rx) = oneshot::channel();
1444        let mut response_channels_lock = self.response_channels.lock();
1445        response_channels_lock.insert(MessageId(envelope.id), tx);
1446        drop(response_channels_lock);
1447
1448        let result = if use_buffer {
1449            self.send_buffered(envelope)
1450        } else {
1451            self.send_unbuffered(envelope)
1452        };
1453        async move {
1454            if let Err(error) = &result {
1455                log::error!("failed to send message: {error}");
1456                anyhow::bail!("failed to send message: {error}");
1457            }
1458
1459            let response = rx.await.context("connection lost")?.0;
1460            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1461                return Err(RpcError::from_proto(error, type_name));
1462            }
1463            Ok(response)
1464        }
1465    }
1466
1467    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1468        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1469        self.send_buffered(envelope)
1470    }
1471
1472    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1473        envelope.ack_id = Some(self.max_received.load(SeqCst));
1474        self.buffer.lock().push_back(envelope.clone());
1475        // ignore errors on send (happen while we're reconnecting)
1476        // assume that the global "disconnected" overlay is sufficient.
1477        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1478        Ok(())
1479    }
1480
1481    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1482        envelope.ack_id = Some(self.max_received.load(SeqCst));
1483        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1484        Ok(())
1485    }
1486}
1487
1488impl ProtoClient for ChannelClient {
1489    fn request(
1490        &self,
1491        envelope: proto::Envelope,
1492        request_type: &'static str,
1493    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1494        self.request_dynamic(envelope, request_type, true).boxed()
1495    }
1496
1497    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1498        self.send_dynamic(envelope)
1499    }
1500
1501    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1502        self.send_dynamic(envelope)
1503    }
1504
1505    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1506        &self.message_handlers
1507    }
1508
1509    fn is_via_collab(&self) -> bool {
1510        false
1511    }
1512
1513    fn has_wsl_interop(&self) -> bool {
1514        self.has_wsl_interop
1515    }
1516}
1517
1518#[cfg(any(test, feature = "test-support"))]
1519mod fake {
1520    use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1521    use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1522    use anyhow::Result;
1523    use askpass::EncryptedPassword;
1524    use async_trait::async_trait;
1525    use collections::HashMap;
1526    use futures::{
1527        FutureExt, SinkExt, StreamExt,
1528        channel::{
1529            mpsc::{self, Sender},
1530            oneshot,
1531        },
1532        select_biased,
1533    };
1534    use gpui::{App, AppContext as _, AsyncApp, Task, TestAppContext};
1535    use release_channel::ReleaseChannel;
1536    use rpc::proto::Envelope;
1537    use semver::Version;
1538    use std::{path::PathBuf, sync::Arc};
1539    use util::paths::{PathStyle, RemotePathBuf};
1540
1541    pub(super) struct FakeRemoteConnection {
1542        pub(super) connection_options: RemoteConnectionOptions,
1543        pub(super) server_channel: Arc<ChannelClient>,
1544        pub(super) server_cx: SendableCx,
1545    }
1546
1547    pub(super) struct SendableCx(AsyncApp);
1548    impl SendableCx {
1549        // SAFETY: When run in test mode, GPUI is always single threaded.
1550        pub(super) fn new(cx: &TestAppContext) -> Self {
1551            Self(cx.to_async())
1552        }
1553
1554        // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1555        fn get(&self, _: &AsyncApp) -> AsyncApp {
1556            self.0.clone()
1557        }
1558    }
1559
1560    // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1561    unsafe impl Send for SendableCx {}
1562    unsafe impl Sync for SendableCx {}
1563
1564    #[async_trait(?Send)]
1565    impl RemoteConnection for FakeRemoteConnection {
1566        async fn kill(&self) -> Result<()> {
1567            Ok(())
1568        }
1569
1570        fn has_been_killed(&self) -> bool {
1571            false
1572        }
1573
1574        fn build_command(
1575            &self,
1576            program: Option<String>,
1577            args: &[String],
1578            env: &HashMap<String, String>,
1579            _: Option<String>,
1580            _: Option<(u16, String, u16)>,
1581        ) -> Result<CommandTemplate> {
1582            let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1583            let mut ssh_args = Vec::new();
1584            ssh_args.push(ssh_program);
1585            ssh_args.extend(args.iter().cloned());
1586            Ok(CommandTemplate {
1587                program: "ssh".into(),
1588                args: ssh_args,
1589                env: env.clone(),
1590            })
1591        }
1592
1593        fn build_forward_ports_command(
1594            &self,
1595            forwards: Vec<(u16, String, u16)>,
1596        ) -> anyhow::Result<CommandTemplate> {
1597            Ok(CommandTemplate {
1598                program: "ssh".into(),
1599                args: std::iter::once("-N".to_owned())
1600                    .chain(forwards.into_iter().map(|(local_port, host, remote_port)| {
1601                        format!("{local_port}:{host}:{remote_port}")
1602                    }))
1603                    .collect(),
1604                env: Default::default(),
1605            })
1606        }
1607
1608        fn upload_directory(
1609            &self,
1610            _src_path: PathBuf,
1611            _dest_path: RemotePathBuf,
1612            _cx: &App,
1613        ) -> Task<Result<()>> {
1614            unreachable!()
1615        }
1616
1617        fn connection_options(&self) -> RemoteConnectionOptions {
1618            self.connection_options.clone()
1619        }
1620
1621        fn simulate_disconnect(&self, cx: &AsyncApp) {
1622            let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1623            let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1624            self.server_channel
1625                .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1626        }
1627
1628        fn start_proxy(
1629            &self,
1630            _unique_identifier: String,
1631            _reconnect: bool,
1632            mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1633            mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1634            mut connection_activity_tx: Sender<()>,
1635            _delegate: Arc<dyn RemoteClientDelegate>,
1636            cx: &mut AsyncApp,
1637        ) -> Task<Result<i32>> {
1638            let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1639            let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1640
1641            self.server_channel.reconnect(
1642                server_incoming_rx,
1643                server_outgoing_tx,
1644                &self.server_cx.get(cx),
1645            );
1646
1647            cx.background_spawn(async move {
1648                loop {
1649                    select_biased! {
1650                        server_to_client = server_outgoing_rx.next().fuse() => {
1651                            let Some(server_to_client) = server_to_client else {
1652                                return Ok(1)
1653                            };
1654                            connection_activity_tx.try_send(()).ok();
1655                            client_incoming_tx.send(server_to_client).await.ok();
1656                        }
1657                        client_to_server = client_outgoing_rx.next().fuse() => {
1658                            let Some(client_to_server) = client_to_server else {
1659                                return Ok(1)
1660                            };
1661                            server_incoming_tx.send(client_to_server).await.ok();
1662                        }
1663                    }
1664                }
1665            })
1666        }
1667
1668        fn path_style(&self) -> PathStyle {
1669            PathStyle::local()
1670        }
1671
1672        fn shell(&self) -> String {
1673            "sh".to_owned()
1674        }
1675
1676        fn default_system_shell(&self) -> String {
1677            "sh".to_owned()
1678        }
1679
1680        fn has_wsl_interop(&self) -> bool {
1681            false
1682        }
1683    }
1684
1685    pub(super) struct Delegate;
1686
1687    impl RemoteClientDelegate for Delegate {
1688        fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1689            unreachable!()
1690        }
1691
1692        fn download_server_binary_locally(
1693            &self,
1694            _: RemotePlatform,
1695            _: ReleaseChannel,
1696            _: Option<Version>,
1697            _: &mut AsyncApp,
1698        ) -> Task<Result<PathBuf>> {
1699            unreachable!()
1700        }
1701
1702        fn get_download_url(
1703            &self,
1704            _platform: RemotePlatform,
1705            _release_channel: ReleaseChannel,
1706            _version: Option<Version>,
1707            _cx: &mut AsyncApp,
1708        ) -> Task<Result<Option<String>>> {
1709            unreachable!()
1710        }
1711
1712        fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1713    }
1714}