remote_client.rs

   1use crate::{
   2    SshConnectionOptions,
   3    protocol::MessageId,
   4    proxy::ProxyLaunchError,
   5    transport::{
   6        ssh::SshRemoteConnection,
   7        wsl::{WslConnectionOptions, WslRemoteConnection},
   8    },
   9};
  10use anyhow::{Context as _, Result, anyhow};
  11use askpass::EncryptedPassword;
  12use async_trait::async_trait;
  13use collections::HashMap;
  14use futures::{
  15    Future, FutureExt as _, StreamExt as _,
  16    channel::{
  17        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  18        oneshot,
  19    },
  20    future::{BoxFuture, Shared},
  21    select, select_biased,
  22};
  23use gpui::{
  24    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  25    EventEmitter, FutureExt, Global, Task, WeakEntity,
  26};
  27use parking_lot::Mutex;
  28
  29use release_channel::ReleaseChannel;
  30use rpc::{
  31    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  32    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  33};
  34use semver::Version;
  35use std::{
  36    collections::VecDeque,
  37    fmt,
  38    ops::ControlFlow,
  39    path::PathBuf,
  40    sync::{
  41        Arc, Weak,
  42        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  43    },
  44    time::{Duration, Instant},
  45};
  46use util::{
  47    ResultExt,
  48    paths::{PathStyle, RemotePathBuf},
  49};
  50
  51#[derive(Copy, Clone, Debug)]
  52pub struct RemotePlatform {
  53    pub os: &'static str,
  54    pub arch: &'static str,
  55}
  56
  57#[derive(Clone, Debug)]
  58pub struct CommandTemplate {
  59    pub program: String,
  60    pub args: Vec<String>,
  61    pub env: HashMap<String, String>,
  62}
  63
  64pub trait RemoteClientDelegate: Send + Sync {
  65    fn ask_password(
  66        &self,
  67        prompt: String,
  68        tx: oneshot::Sender<EncryptedPassword>,
  69        cx: &mut AsyncApp,
  70    );
  71    fn get_download_url(
  72        &self,
  73        platform: RemotePlatform,
  74        release_channel: ReleaseChannel,
  75        version: Option<Version>,
  76        cx: &mut AsyncApp,
  77    ) -> Task<Result<Option<String>>>;
  78    fn download_server_binary_locally(
  79        &self,
  80        platform: RemotePlatform,
  81        release_channel: ReleaseChannel,
  82        version: Option<Version>,
  83        cx: &mut AsyncApp,
  84    ) -> Task<Result<PathBuf>>;
  85    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
  86}
  87
  88const MAX_MISSED_HEARTBEATS: usize = 5;
  89const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
  90const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
  91const INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
  92
  93const MAX_RECONNECT_ATTEMPTS: usize = 3;
  94
  95enum State {
  96    Connecting,
  97    Connected {
  98        remote_connection: Arc<dyn RemoteConnection>,
  99        delegate: Arc<dyn RemoteClientDelegate>,
 100
 101        multiplex_task: Task<Result<()>>,
 102        heartbeat_task: Task<Result<()>>,
 103    },
 104    HeartbeatMissed {
 105        missed_heartbeats: usize,
 106
 107        ssh_connection: Arc<dyn RemoteConnection>,
 108        delegate: Arc<dyn RemoteClientDelegate>,
 109
 110        multiplex_task: Task<Result<()>>,
 111        heartbeat_task: Task<Result<()>>,
 112    },
 113    Reconnecting,
 114    ReconnectFailed {
 115        ssh_connection: Arc<dyn RemoteConnection>,
 116        delegate: Arc<dyn RemoteClientDelegate>,
 117
 118        error: anyhow::Error,
 119        attempts: usize,
 120    },
 121    ReconnectExhausted,
 122    ServerNotRunning,
 123}
 124
 125impl fmt::Display for State {
 126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 127        match self {
 128            Self::Connecting => write!(f, "connecting"),
 129            Self::Connected { .. } => write!(f, "connected"),
 130            Self::Reconnecting => write!(f, "reconnecting"),
 131            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 132            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 133            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 134            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 135        }
 136    }
 137}
 138
 139impl State {
 140    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 141        match self {
 142            Self::Connected {
 143                remote_connection: ssh_connection,
 144                ..
 145            } => Some(ssh_connection.clone()),
 146            Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.clone()),
 147            Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.clone()),
 148            _ => None,
 149        }
 150    }
 151
 152    fn can_reconnect(&self) -> bool {
 153        match self {
 154            Self::Connected { .. }
 155            | Self::HeartbeatMissed { .. }
 156            | Self::ReconnectFailed { .. } => true,
 157            State::Connecting
 158            | State::Reconnecting
 159            | State::ReconnectExhausted
 160            | State::ServerNotRunning => false,
 161        }
 162    }
 163
 164    fn is_reconnect_failed(&self) -> bool {
 165        matches!(self, Self::ReconnectFailed { .. })
 166    }
 167
 168    fn is_reconnect_exhausted(&self) -> bool {
 169        matches!(self, Self::ReconnectExhausted { .. })
 170    }
 171
 172    fn is_server_not_running(&self) -> bool {
 173        matches!(self, Self::ServerNotRunning)
 174    }
 175
 176    fn is_reconnecting(&self) -> bool {
 177        matches!(self, Self::Reconnecting { .. })
 178    }
 179
 180    fn heartbeat_recovered(self) -> Self {
 181        match self {
 182            Self::HeartbeatMissed {
 183                ssh_connection,
 184                delegate,
 185                multiplex_task,
 186                heartbeat_task,
 187                ..
 188            } => Self::Connected {
 189                remote_connection: ssh_connection,
 190                delegate,
 191                multiplex_task,
 192                heartbeat_task,
 193            },
 194            _ => self,
 195        }
 196    }
 197
 198    fn heartbeat_missed(self) -> Self {
 199        match self {
 200            Self::Connected {
 201                remote_connection: ssh_connection,
 202                delegate,
 203                multiplex_task,
 204                heartbeat_task,
 205            } => Self::HeartbeatMissed {
 206                missed_heartbeats: 1,
 207                ssh_connection,
 208                delegate,
 209                multiplex_task,
 210                heartbeat_task,
 211            },
 212            Self::HeartbeatMissed {
 213                missed_heartbeats,
 214                ssh_connection,
 215                delegate,
 216                multiplex_task,
 217                heartbeat_task,
 218            } => Self::HeartbeatMissed {
 219                missed_heartbeats: missed_heartbeats + 1,
 220                ssh_connection,
 221                delegate,
 222                multiplex_task,
 223                heartbeat_task,
 224            },
 225            _ => self,
 226        }
 227    }
 228}
 229
 230/// The state of the ssh connection.
 231#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 232pub enum ConnectionState {
 233    Connecting,
 234    Connected,
 235    HeartbeatMissed,
 236    Reconnecting,
 237    Disconnected,
 238}
 239
 240impl From<&State> for ConnectionState {
 241    fn from(value: &State) -> Self {
 242        match value {
 243            State::Connecting => Self::Connecting,
 244            State::Connected { .. } => Self::Connected,
 245            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 246            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 247            State::ReconnectExhausted => Self::Disconnected,
 248            State::ServerNotRunning => Self::Disconnected,
 249        }
 250    }
 251}
 252
 253pub struct RemoteClient {
 254    client: Arc<ChannelClient>,
 255    unique_identifier: String,
 256    connection_options: RemoteConnectionOptions,
 257    path_style: PathStyle,
 258    state: Option<State>,
 259}
 260
 261#[derive(Debug)]
 262pub enum RemoteClientEvent {
 263    Disconnected,
 264}
 265
 266impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 267
 268/// Identifies the socket on the remote server so that reconnects
 269/// can re-join the same project.
 270pub enum ConnectionIdentifier {
 271    Setup(u64),
 272    Workspace(i64),
 273}
 274
 275static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 276
 277impl ConnectionIdentifier {
 278    pub fn setup() -> Self {
 279        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 280    }
 281
 282    // This string gets used in a socket name, and so must be relatively short.
 283    // The total length of:
 284    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 285    // Must be less than about 100 characters
 286    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 287    // So our strings should be at most 20 characters or so.
 288    fn to_string(&self, cx: &App) -> String {
 289        let identifier_prefix = match ReleaseChannel::global(cx) {
 290            ReleaseChannel::Stable => "".to_string(),
 291            release_channel => format!("{}-", release_channel.dev_name()),
 292        };
 293        match self {
 294            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 295            Self::Workspace(workspace_id) => {
 296                format!("{identifier_prefix}workspace-{workspace_id}",)
 297            }
 298        }
 299    }
 300}
 301
 302pub async fn connect(
 303    connection_options: RemoteConnectionOptions,
 304    delegate: Arc<dyn RemoteClientDelegate>,
 305    cx: &mut AsyncApp,
 306) -> Result<Arc<dyn RemoteConnection>> {
 307    cx.update(|cx| {
 308        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 309            pool.connect(connection_options.clone(), delegate.clone(), cx)
 310        })
 311    })?
 312    .await
 313    .map_err(|e| e.cloned())
 314}
 315
 316impl RemoteClient {
 317    pub fn new(
 318        unique_identifier: ConnectionIdentifier,
 319        remote_connection: Arc<dyn RemoteConnection>,
 320        cancellation: oneshot::Receiver<()>,
 321        delegate: Arc<dyn RemoteClientDelegate>,
 322        cx: &mut App,
 323    ) -> Task<Result<Option<Entity<Self>>>> {
 324        let unique_identifier = unique_identifier.to_string(cx);
 325        cx.spawn(async move |cx| {
 326            let success = Box::pin(async move {
 327                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 328                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 329                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 330
 331                let client = cx.update(|cx| {
 332                    ChannelClient::new(
 333                        incoming_rx,
 334                        outgoing_tx,
 335                        cx,
 336                        "client",
 337                        remote_connection.has_wsl_interop(),
 338                    )
 339                })?;
 340
 341                let path_style = remote_connection.path_style();
 342                let this = cx.new(|_| Self {
 343                    client: client.clone(),
 344                    unique_identifier: unique_identifier.clone(),
 345                    connection_options: remote_connection.connection_options(),
 346                    path_style,
 347                    state: Some(State::Connecting),
 348                })?;
 349
 350                let io_task = remote_connection.start_proxy(
 351                    unique_identifier,
 352                    false,
 353                    incoming_tx,
 354                    outgoing_rx,
 355                    connection_activity_tx,
 356                    delegate.clone(),
 357                    cx,
 358                );
 359
 360                let ready = client
 361                    .wait_for_remote_started()
 362                    .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
 363                    .await;
 364                match ready {
 365                    Ok(Some(_)) => {}
 366                    Ok(None) => {
 367                        let mut error = "remote client exited before becoming ready".to_owned();
 368                        if let Some(status) = io_task.now_or_never() {
 369                            match status {
 370                                Ok(exit_code) => {
 371                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 372                                }
 373                                Err(e) => error.push_str(&format!(", error={e:?}")),
 374                            }
 375                        }
 376                        let error = anyhow::anyhow!("{error}");
 377                        log::error!("failed to establish connection: {}", error);
 378                        return Err(error);
 379                    }
 380                    Err(_) => {
 381                        let mut error =
 382                            "remote client did not become ready within the timeout".to_owned();
 383                        if let Some(status) = io_task.now_or_never() {
 384                            match status {
 385                                Ok(exit_code) => {
 386                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 387                                }
 388                                Err(e) => error.push_str(&format!(", error={e:?}")),
 389                            }
 390                        }
 391                        let error = anyhow::anyhow!("{error}");
 392                        log::error!("failed to establish connection: {}", error);
 393                        return Err(error);
 394                    }
 395                }
 396                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 397                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 398                    log::error!("failed to establish connection: {}", error);
 399                    return Err(error);
 400                }
 401
 402                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 403
 404                this.update(cx, |this, _| {
 405                    this.state = Some(State::Connected {
 406                        remote_connection,
 407                        delegate,
 408                        multiplex_task,
 409                        heartbeat_task,
 410                    });
 411                })?;
 412
 413                Ok(Some(this))
 414            });
 415
 416            select! {
 417                _ = cancellation.fuse() => {
 418                    Ok(None)
 419                }
 420                result = success.fuse() =>  result
 421            }
 422        })
 423    }
 424
 425    pub fn proto_client_from_channels(
 426        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 427        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 428        cx: &App,
 429        name: &'static str,
 430        has_wsl_interop: bool,
 431    ) -> AnyProtoClient {
 432        ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
 433    }
 434
 435    pub fn shutdown_processes<T: RequestMessage>(
 436        &mut self,
 437        shutdown_request: Option<T>,
 438        executor: BackgroundExecutor,
 439    ) -> Option<impl Future<Output = ()> + use<T>> {
 440        let state = self.state.take()?;
 441        log::info!("shutting down ssh processes");
 442
 443        let State::Connected {
 444            multiplex_task,
 445            heartbeat_task,
 446            remote_connection: ssh_connection,
 447            delegate,
 448        } = state
 449        else {
 450            return None;
 451        };
 452
 453        let client = self.client.clone();
 454
 455        Some(async move {
 456            if let Some(shutdown_request) = shutdown_request {
 457                client.send(shutdown_request).log_err();
 458                // We wait 50ms instead of waiting for a response, because
 459                // waiting for a response would require us to wait on the main thread
 460                // which we want to avoid in an `on_app_quit` callback.
 461                executor.timer(Duration::from_millis(50)).await;
 462            }
 463
 464            // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
 465            // child of master_process.
 466            drop(multiplex_task);
 467            // Now drop the rest of state, which kills master process.
 468            drop(heartbeat_task);
 469            drop(ssh_connection);
 470            drop(delegate);
 471        })
 472    }
 473
 474    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 475        let can_reconnect = self
 476            .state
 477            .as_ref()
 478            .map(|state| state.can_reconnect())
 479            .unwrap_or(false);
 480        if !can_reconnect {
 481            log::info!("aborting reconnect, because not in state that allows reconnecting");
 482            let error = if let Some(state) = self.state.as_ref() {
 483                format!("invalid state, cannot reconnect while in state {state}")
 484            } else {
 485                "no state set".to_string()
 486            };
 487            anyhow::bail!(error);
 488        }
 489
 490        let state = self.state.take().unwrap();
 491        let (attempts, remote_connection, delegate) = match state {
 492            State::Connected {
 493                remote_connection: ssh_connection,
 494                delegate,
 495                multiplex_task,
 496                heartbeat_task,
 497            }
 498            | State::HeartbeatMissed {
 499                ssh_connection,
 500                delegate,
 501                multiplex_task,
 502                heartbeat_task,
 503                ..
 504            } => {
 505                drop(multiplex_task);
 506                drop(heartbeat_task);
 507                (0, ssh_connection, delegate)
 508            }
 509            State::ReconnectFailed {
 510                attempts,
 511                ssh_connection,
 512                delegate,
 513                ..
 514            } => (attempts, ssh_connection, delegate),
 515            State::Connecting
 516            | State::Reconnecting
 517            | State::ReconnectExhausted
 518            | State::ServerNotRunning => unreachable!(),
 519        };
 520
 521        let attempts = attempts + 1;
 522        if attempts > MAX_RECONNECT_ATTEMPTS {
 523            log::error!(
 524                "Failed to reconnect to after {} attempts, giving up",
 525                MAX_RECONNECT_ATTEMPTS
 526            );
 527            self.set_state(State::ReconnectExhausted, cx);
 528            return Ok(());
 529        }
 530
 531        self.set_state(State::Reconnecting, cx);
 532
 533        log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
 534
 535        let unique_identifier = self.unique_identifier.clone();
 536        let client = self.client.clone();
 537        let reconnect_task = cx.spawn(async move |this, cx| {
 538            macro_rules! failed {
 539                ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr) => {
 540                    delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
 541                    return State::ReconnectFailed {
 542                        error: anyhow!($error),
 543                        attempts: $attempts,
 544                        ssh_connection: $ssh_connection,
 545                        delegate: $delegate,
 546                    };
 547                };
 548            }
 549
 550            if let Err(error) = remote_connection
 551                .kill()
 552                .await
 553                .context("Failed to kill ssh process")
 554            {
 555                failed!(error, attempts, remote_connection, delegate);
 556            };
 557
 558            let connection_options = remote_connection.connection_options();
 559
 560            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 561            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 562            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 563
 564            let (ssh_connection, io_task) = match async {
 565                let ssh_connection = cx
 566                    .update_global(|pool: &mut ConnectionPool, cx| {
 567                        pool.connect(connection_options, delegate.clone(), cx)
 568                    })?
 569                    .await
 570                    .map_err(|error| error.cloned())?;
 571
 572                let io_task = ssh_connection.start_proxy(
 573                    unique_identifier,
 574                    true,
 575                    incoming_tx,
 576                    outgoing_rx,
 577                    connection_activity_tx,
 578                    delegate.clone(),
 579                    cx,
 580                );
 581                anyhow::Ok((ssh_connection, io_task))
 582            }
 583            .await
 584            {
 585                Ok((ssh_connection, io_task)) => (ssh_connection, io_task),
 586                Err(error) => {
 587                    failed!(error, attempts, remote_connection, delegate);
 588                }
 589            };
 590
 591            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 592            client.reconnect(incoming_rx, outgoing_tx, cx);
 593
 594            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 595                failed!(error, attempts, ssh_connection, delegate);
 596            };
 597
 598            State::Connected {
 599                remote_connection: ssh_connection,
 600                delegate,
 601                multiplex_task,
 602                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 603            }
 604        });
 605
 606        cx.spawn(async move |this, cx| {
 607            let new_state = reconnect_task.await;
 608            this.update(cx, |this, cx| {
 609                this.try_set_state(cx, |old_state| {
 610                    if old_state.is_reconnecting() {
 611                        match &new_state {
 612                            State::Connecting
 613                            | State::Reconnecting
 614                            | State::HeartbeatMissed { .. }
 615                            | State::ServerNotRunning => {}
 616                            State::Connected { .. } => {
 617                                log::info!("Successfully reconnected");
 618                            }
 619                            State::ReconnectFailed {
 620                                error, attempts, ..
 621                            } => {
 622                                log::error!(
 623                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 624                                    attempts,
 625                                    error
 626                                );
 627                            }
 628                            State::ReconnectExhausted => {
 629                                log::error!("Reconnect attempt failed and all attempts exhausted");
 630                            }
 631                        }
 632                        Some(new_state)
 633                    } else {
 634                        None
 635                    }
 636                });
 637
 638                if this.state_is(State::is_reconnect_failed) {
 639                    this.reconnect(cx)
 640                } else if this.state_is(State::is_reconnect_exhausted) {
 641                    Ok(())
 642                } else {
 643                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 644                    Ok(())
 645                }
 646            })
 647        })
 648        .detach_and_log_err(cx);
 649
 650        Ok(())
 651    }
 652
 653    fn heartbeat(
 654        this: WeakEntity<Self>,
 655        mut connection_activity_rx: mpsc::Receiver<()>,
 656        cx: &mut AsyncApp,
 657    ) -> Task<Result<()>> {
 658        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 659            return Task::ready(Err(anyhow!("SshRemoteClient lost")));
 660        };
 661
 662        cx.spawn(async move |cx| {
 663            let mut missed_heartbeats = 0;
 664
 665            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 666            futures::pin_mut!(keepalive_timer);
 667
 668            loop {
 669                select_biased! {
 670                    result = connection_activity_rx.next().fuse() => {
 671                        if result.is_none() {
 672                            log::warn!("ssh heartbeat: connection activity channel has been dropped. stopping.");
 673                            return Ok(());
 674                        }
 675
 676                        if missed_heartbeats != 0 {
 677                            missed_heartbeats = 0;
 678                            let _ =this.update(cx, |this, cx| {
 679                                this.handle_heartbeat_result(missed_heartbeats, cx)
 680                            })?;
 681                        }
 682                    }
 683                    _ = keepalive_timer => {
 684                        log::debug!("Sending heartbeat to server...");
 685
 686                        let result = select_biased! {
 687                            _ = connection_activity_rx.next().fuse() => {
 688                                Ok(())
 689                            }
 690                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 691                                ping_result
 692                            }
 693                        };
 694
 695                        if result.is_err() {
 696                            missed_heartbeats += 1;
 697                            log::warn!(
 698                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 699                                HEARTBEAT_TIMEOUT,
 700                                missed_heartbeats,
 701                                MAX_MISSED_HEARTBEATS
 702                            );
 703                        } else if missed_heartbeats != 0 {
 704                            missed_heartbeats = 0;
 705                        } else {
 706                            continue;
 707                        }
 708
 709                        let result = this.update(cx, |this, cx| {
 710                            this.handle_heartbeat_result(missed_heartbeats, cx)
 711                        })?;
 712                        if result.is_break() {
 713                            return Ok(());
 714                        }
 715                    }
 716                }
 717
 718                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 719            }
 720        })
 721    }
 722
 723    fn handle_heartbeat_result(
 724        &mut self,
 725        missed_heartbeats: usize,
 726        cx: &mut Context<Self>,
 727    ) -> ControlFlow<()> {
 728        let state = self.state.take().unwrap();
 729        let next_state = if missed_heartbeats > 0 {
 730            state.heartbeat_missed()
 731        } else {
 732            state.heartbeat_recovered()
 733        };
 734
 735        self.set_state(next_state, cx);
 736
 737        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 738            log::error!(
 739                "Missed last {} heartbeats. Reconnecting...",
 740                missed_heartbeats
 741            );
 742
 743            self.reconnect(cx)
 744                .context("failed to start reconnect process after missing heartbeats")
 745                .log_err();
 746            ControlFlow::Break(())
 747        } else {
 748            ControlFlow::Continue(())
 749        }
 750    }
 751
 752    fn monitor(
 753        this: WeakEntity<Self>,
 754        io_task: Task<Result<i32>>,
 755        cx: &AsyncApp,
 756    ) -> Task<Result<()>> {
 757        cx.spawn(async move |cx| {
 758            let result = io_task.await;
 759
 760            match result {
 761                Ok(exit_code) => {
 762                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 763                        match error {
 764                            ProxyLaunchError::ServerNotRunning => {
 765                                log::error!("failed to reconnect because server is not running");
 766                                this.update(cx, |this, cx| {
 767                                    this.set_state(State::ServerNotRunning, cx);
 768                                })?;
 769                            }
 770                        }
 771                    } else if exit_code > 0 {
 772                        log::error!("proxy process terminated unexpectedly");
 773                        this.update(cx, |this, cx| {
 774                            this.reconnect(cx).ok();
 775                        })?;
 776                    }
 777                }
 778                Err(error) => {
 779                    log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
 780                    this.update(cx, |this, cx| {
 781                        this.reconnect(cx).ok();
 782                    })?;
 783                }
 784            }
 785
 786            Ok(())
 787        })
 788    }
 789
 790    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 791        self.state.as_ref().is_some_and(check)
 792    }
 793
 794    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 795        let new_state = self.state.as_ref().and_then(map);
 796        if let Some(new_state) = new_state {
 797            self.state.replace(new_state);
 798            cx.notify();
 799        }
 800    }
 801
 802    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 803        log::info!("setting state to '{}'", &state);
 804
 805        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 806        let is_server_not_running = state.is_server_not_running();
 807        self.state.replace(state);
 808
 809        if is_reconnect_exhausted || is_server_not_running {
 810            cx.emit(RemoteClientEvent::Disconnected);
 811        }
 812        cx.notify();
 813    }
 814
 815    pub fn shell(&self) -> Option<String> {
 816        Some(self.remote_connection()?.shell())
 817    }
 818
 819    pub fn default_system_shell(&self) -> Option<String> {
 820        Some(self.remote_connection()?.default_system_shell())
 821    }
 822
 823    pub fn shares_network_interface(&self) -> bool {
 824        self.remote_connection()
 825            .map_or(false, |connection| connection.shares_network_interface())
 826    }
 827
 828    pub fn build_command(
 829        &self,
 830        program: Option<String>,
 831        args: &[String],
 832        env: &HashMap<String, String>,
 833        working_dir: Option<String>,
 834        port_forward: Option<(u16, String, u16)>,
 835    ) -> Result<CommandTemplate> {
 836        let Some(connection) = self.remote_connection() else {
 837            return Err(anyhow!("no ssh connection"));
 838        };
 839        connection.build_command(program, args, env, working_dir, port_forward)
 840    }
 841
 842    pub fn build_forward_ports_command(
 843        &self,
 844        forwards: Vec<(u16, String, u16)>,
 845    ) -> Result<CommandTemplate> {
 846        let Some(connection) = self.remote_connection() else {
 847            return Err(anyhow!("no ssh connection"));
 848        };
 849        connection.build_forward_ports_command(forwards)
 850    }
 851
 852    pub fn upload_directory(
 853        &self,
 854        src_path: PathBuf,
 855        dest_path: RemotePathBuf,
 856        cx: &App,
 857    ) -> Task<Result<()>> {
 858        let Some(connection) = self.remote_connection() else {
 859            return Task::ready(Err(anyhow!("no ssh connection")));
 860        };
 861        connection.upload_directory(src_path, dest_path, cx)
 862    }
 863
 864    pub fn proto_client(&self) -> AnyProtoClient {
 865        self.client.clone().into()
 866    }
 867
 868    pub fn connection_options(&self) -> RemoteConnectionOptions {
 869        self.connection_options.clone()
 870    }
 871
 872    pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 873        if let State::Connected {
 874            remote_connection, ..
 875        } = self.state.as_ref()?
 876        {
 877            Some(remote_connection.clone())
 878        } else {
 879            None
 880        }
 881    }
 882
 883    pub fn connection_state(&self) -> ConnectionState {
 884        self.state
 885            .as_ref()
 886            .map(ConnectionState::from)
 887            .unwrap_or(ConnectionState::Disconnected)
 888    }
 889
 890    pub fn is_disconnected(&self) -> bool {
 891        self.connection_state() == ConnectionState::Disconnected
 892    }
 893
 894    pub fn path_style(&self) -> PathStyle {
 895        self.path_style
 896    }
 897
 898    #[cfg(any(test, feature = "test-support"))]
 899    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
 900        let opts = self.connection_options();
 901        client_cx.spawn(async move |cx| {
 902            let connection = cx
 903                .update_global(|c: &mut ConnectionPool, _| {
 904                    if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
 905                        c.clone()
 906                    } else {
 907                        panic!("missing test connection")
 908                    }
 909                })
 910                .unwrap()
 911                .await
 912                .unwrap();
 913
 914            connection.simulate_disconnect(cx);
 915        })
 916    }
 917
 918    #[cfg(any(test, feature = "test-support"))]
 919    pub fn fake_server(
 920        client_cx: &mut gpui::TestAppContext,
 921        server_cx: &mut gpui::TestAppContext,
 922    ) -> (RemoteConnectionOptions, AnyProtoClient) {
 923        let port = client_cx
 924            .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
 925        let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
 926            host: "<fake>".to_string(),
 927            port: Some(port),
 928            ..Default::default()
 929        });
 930        let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
 931        let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
 932        let server_client = server_cx
 933            .update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server", false));
 934        let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
 935            connection_options: opts.clone(),
 936            server_cx: fake::SendableCx::new(server_cx),
 937            server_channel: server_client.clone(),
 938        });
 939
 940        client_cx.update(|cx| {
 941            cx.update_default_global(|c: &mut ConnectionPool, cx| {
 942                c.connections.insert(
 943                    opts.clone(),
 944                    ConnectionPoolEntry::Connecting(
 945                        cx.background_spawn({
 946                            let connection = connection.clone();
 947                            async move { Ok(connection.clone()) }
 948                        })
 949                        .shared(),
 950                    ),
 951                );
 952            })
 953        });
 954
 955        (opts, server_client.into())
 956    }
 957
 958    #[cfg(any(test, feature = "test-support"))]
 959    pub async fn fake_client(
 960        opts: RemoteConnectionOptions,
 961        client_cx: &mut gpui::TestAppContext,
 962    ) -> Entity<Self> {
 963        let (_tx, rx) = oneshot::channel();
 964        let mut cx = client_cx.to_async();
 965        let connection = connect(opts, Arc::new(fake::Delegate), &mut cx)
 966            .await
 967            .unwrap();
 968        client_cx
 969            .update(|cx| {
 970                Self::new(
 971                    ConnectionIdentifier::setup(),
 972                    connection,
 973                    rx,
 974                    Arc::new(fake::Delegate),
 975                    cx,
 976                )
 977            })
 978            .await
 979            .unwrap()
 980            .unwrap()
 981    }
 982
 983    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 984        self.state
 985            .as_ref()
 986            .and_then(|state| state.remote_connection())
 987    }
 988}
 989
 990enum ConnectionPoolEntry {
 991    Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
 992    Connected(Weak<dyn RemoteConnection>),
 993}
 994
 995#[derive(Default)]
 996struct ConnectionPool {
 997    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
 998}
 999
1000impl Global for ConnectionPool {}
1001
1002impl ConnectionPool {
1003    pub fn connect(
1004        &mut self,
1005        opts: RemoteConnectionOptions,
1006        delegate: Arc<dyn RemoteClientDelegate>,
1007        cx: &mut App,
1008    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1009        let connection = self.connections.get(&opts);
1010        match connection {
1011            Some(ConnectionPoolEntry::Connecting(task)) => {
1012                delegate.set_status(
1013                    Some("Waiting for existing connection attempt"),
1014                    &mut cx.to_async(),
1015                );
1016                return task.clone();
1017            }
1018            Some(ConnectionPoolEntry::Connected(ssh)) => {
1019                if let Some(ssh) = ssh.upgrade()
1020                    && !ssh.has_been_killed()
1021                {
1022                    return Task::ready(Ok(ssh)).shared();
1023                }
1024                self.connections.remove(&opts);
1025            }
1026            None => {}
1027        }
1028
1029        let task = cx
1030            .spawn({
1031                let opts = opts.clone();
1032                let delegate = delegate.clone();
1033                async move |cx| {
1034                    let connection = match opts.clone() {
1035                        RemoteConnectionOptions::Ssh(opts) => {
1036                            SshRemoteConnection::new(opts, delegate, cx)
1037                                .await
1038                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1039                        }
1040                        RemoteConnectionOptions::Wsl(opts) => {
1041                            WslRemoteConnection::new(opts, delegate, cx)
1042                                .await
1043                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1044                        }
1045                    };
1046
1047                    cx.update_global(|pool: &mut Self, _| {
1048                        debug_assert!(matches!(
1049                            pool.connections.get(&opts),
1050                            Some(ConnectionPoolEntry::Connecting(_))
1051                        ));
1052                        match connection {
1053                            Ok(connection) => {
1054                                pool.connections.insert(
1055                                    opts.clone(),
1056                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1057                                );
1058                                Ok(connection)
1059                            }
1060                            Err(error) => {
1061                                pool.connections.remove(&opts);
1062                                Err(Arc::new(error))
1063                            }
1064                        }
1065                    })?
1066                }
1067            })
1068            .shared();
1069
1070        self.connections
1071            .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1072        task
1073    }
1074}
1075
1076#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1077pub enum RemoteConnectionOptions {
1078    Ssh(SshConnectionOptions),
1079    Wsl(WslConnectionOptions),
1080}
1081
1082impl RemoteConnectionOptions {
1083    pub fn display_name(&self) -> String {
1084        match self {
1085            RemoteConnectionOptions::Ssh(opts) => opts.host.clone(),
1086            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1087        }
1088    }
1089}
1090
1091impl From<SshConnectionOptions> for RemoteConnectionOptions {
1092    fn from(opts: SshConnectionOptions) -> Self {
1093        RemoteConnectionOptions::Ssh(opts)
1094    }
1095}
1096
1097impl From<WslConnectionOptions> for RemoteConnectionOptions {
1098    fn from(opts: WslConnectionOptions) -> Self {
1099        RemoteConnectionOptions::Wsl(opts)
1100    }
1101}
1102
1103#[cfg(target_os = "windows")]
1104/// Open a wsl path (\\wsl.localhost\<distro>\path)
1105#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1106#[action(namespace = workspace, no_json, no_register)]
1107pub struct OpenWslPath {
1108    pub distro: WslConnectionOptions,
1109    pub paths: Vec<PathBuf>,
1110}
1111
1112#[async_trait(?Send)]
1113pub trait RemoteConnection: Send + Sync {
1114    fn start_proxy(
1115        &self,
1116        unique_identifier: String,
1117        reconnect: bool,
1118        incoming_tx: UnboundedSender<Envelope>,
1119        outgoing_rx: UnboundedReceiver<Envelope>,
1120        connection_activity_tx: Sender<()>,
1121        delegate: Arc<dyn RemoteClientDelegate>,
1122        cx: &mut AsyncApp,
1123    ) -> Task<Result<i32>>;
1124    fn upload_directory(
1125        &self,
1126        src_path: PathBuf,
1127        dest_path: RemotePathBuf,
1128        cx: &App,
1129    ) -> Task<Result<()>>;
1130    async fn kill(&self) -> Result<()>;
1131    fn has_been_killed(&self) -> bool;
1132    fn shares_network_interface(&self) -> bool {
1133        false
1134    }
1135    fn build_command(
1136        &self,
1137        program: Option<String>,
1138        args: &[String],
1139        env: &HashMap<String, String>,
1140        working_dir: Option<String>,
1141        port_forward: Option<(u16, String, u16)>,
1142    ) -> Result<CommandTemplate>;
1143    fn build_forward_ports_command(
1144        &self,
1145        forwards: Vec<(u16, String, u16)>,
1146    ) -> Result<CommandTemplate>;
1147    fn connection_options(&self) -> RemoteConnectionOptions;
1148    fn path_style(&self) -> PathStyle;
1149    fn shell(&self) -> String;
1150    fn default_system_shell(&self) -> String;
1151    fn has_wsl_interop(&self) -> bool;
1152
1153    #[cfg(any(test, feature = "test-support"))]
1154    fn simulate_disconnect(&self, _: &AsyncApp) {}
1155}
1156
1157type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1158
1159struct Signal<T> {
1160    tx: Mutex<Option<oneshot::Sender<T>>>,
1161    rx: Shared<Task<Option<T>>>,
1162}
1163
1164impl<T: Send + Clone + 'static> Signal<T> {
1165    pub fn new(cx: &App) -> Self {
1166        let (tx, rx) = oneshot::channel();
1167
1168        let task = cx
1169            .background_executor()
1170            .spawn(async move { rx.await.ok() })
1171            .shared();
1172
1173        Self {
1174            tx: Mutex::new(Some(tx)),
1175            rx: task,
1176        }
1177    }
1178
1179    fn set(&self, value: T) {
1180        if let Some(tx) = self.tx.lock().take() {
1181            let _ = tx.send(value);
1182        }
1183    }
1184
1185    fn wait(&self) -> Shared<Task<Option<T>>> {
1186        self.rx.clone()
1187    }
1188}
1189
1190struct ChannelClient {
1191    next_message_id: AtomicU32,
1192    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1193    buffer: Mutex<VecDeque<Envelope>>,
1194    response_channels: ResponseChannels,
1195    message_handlers: Mutex<ProtoMessageHandlerSet>,
1196    max_received: AtomicU32,
1197    name: &'static str,
1198    task: Mutex<Task<Result<()>>>,
1199    remote_started: Signal<()>,
1200    has_wsl_interop: bool,
1201}
1202
1203impl ChannelClient {
1204    fn new(
1205        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1206        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1207        cx: &App,
1208        name: &'static str,
1209        has_wsl_interop: bool,
1210    ) -> Arc<Self> {
1211        Arc::new_cyclic(|this| Self {
1212            outgoing_tx: Mutex::new(outgoing_tx),
1213            next_message_id: AtomicU32::new(0),
1214            max_received: AtomicU32::new(0),
1215            response_channels: ResponseChannels::default(),
1216            message_handlers: Default::default(),
1217            buffer: Mutex::new(VecDeque::new()),
1218            name,
1219            task: Mutex::new(Self::start_handling_messages(
1220                this.clone(),
1221                incoming_rx,
1222                &cx.to_async(),
1223            )),
1224            remote_started: Signal::new(cx),
1225            has_wsl_interop,
1226        })
1227    }
1228
1229    fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1230        self.remote_started.wait()
1231    }
1232
1233    fn start_handling_messages(
1234        this: Weak<Self>,
1235        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1236        cx: &AsyncApp,
1237    ) -> Task<Result<()>> {
1238        cx.spawn(async move |cx| {
1239            if let Some(this) = this.upgrade() {
1240                let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1241                this.outgoing_tx.lock().unbounded_send(envelope).ok();
1242            };
1243
1244            let peer_id = PeerId { owner_id: 0, id: 0 };
1245            while let Some(incoming) = incoming_rx.next().await {
1246                let Some(this) = this.upgrade() else {
1247                    return anyhow::Ok(());
1248                };
1249                if let Some(ack_id) = incoming.ack_id {
1250                    let mut buffer = this.buffer.lock();
1251                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1252                        buffer.pop_front();
1253                    }
1254                }
1255                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1256                {
1257                    log::debug!(
1258                        "{}:ssh message received. name:FlushBufferedMessages",
1259                        this.name
1260                    );
1261                    {
1262                        let buffer = this.buffer.lock();
1263                        for envelope in buffer.iter() {
1264                            this.outgoing_tx
1265                                .lock()
1266                                .unbounded_send(envelope.clone())
1267                                .ok();
1268                        }
1269                    }
1270                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1271                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1272                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1273                    continue;
1274                }
1275
1276                if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1277                    this.remote_started.set(());
1278                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1279                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1280                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1281                    continue;
1282                }
1283
1284                this.max_received.store(incoming.id, SeqCst);
1285
1286                if let Some(request_id) = incoming.responding_to {
1287                    let request_id = MessageId(request_id);
1288                    let sender = this.response_channels.lock().remove(&request_id);
1289                    if let Some(sender) = sender {
1290                        let (tx, rx) = oneshot::channel();
1291                        if incoming.payload.is_some() {
1292                            sender.send((incoming, tx)).ok();
1293                        }
1294                        rx.await.ok();
1295                    }
1296                } else if let Some(envelope) =
1297                    build_typed_envelope(peer_id, Instant::now(), incoming)
1298                {
1299                    let type_name = envelope.payload_type_name();
1300                    let message_id = envelope.message_id();
1301                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1302                        &this.message_handlers,
1303                        envelope,
1304                        this.clone().into(),
1305                        cx.clone(),
1306                    ) {
1307                        log::debug!("{}:ssh message received. name:{type_name}", this.name);
1308                        cx.foreground_executor()
1309                            .spawn(async move {
1310                                match future.await {
1311                                    Ok(_) => {
1312                                        log::debug!(
1313                                            "{}:ssh message handled. name:{type_name}",
1314                                            this.name
1315                                        );
1316                                    }
1317                                    Err(error) => {
1318                                        log::error!(
1319                                            "{}:error handling message. type:{}, error:{:#}",
1320                                            this.name,
1321                                            type_name,
1322                                            format!("{error:#}").lines().fold(
1323                                                String::new(),
1324                                                |mut message, line| {
1325                                                    if !message.is_empty() {
1326                                                        message.push(' ');
1327                                                    }
1328                                                    message.push_str(line);
1329                                                    message
1330                                                }
1331                                            )
1332                                        );
1333                                    }
1334                                }
1335                            })
1336                            .detach()
1337                    } else {
1338                        log::error!("{}:unhandled ssh message name:{type_name}", this.name);
1339                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1340                            message_id,
1341                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1342                        ) {
1343                            log::error!(
1344                                "{}:error sending error response for {type_name}:{e:#}",
1345                                this.name
1346                            );
1347                        }
1348                    }
1349                }
1350            }
1351            anyhow::Ok(())
1352        })
1353    }
1354
1355    fn reconnect(
1356        self: &Arc<Self>,
1357        incoming_rx: UnboundedReceiver<Envelope>,
1358        outgoing_tx: UnboundedSender<Envelope>,
1359        cx: &AsyncApp,
1360    ) {
1361        *self.outgoing_tx.lock() = outgoing_tx;
1362        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1363    }
1364
1365    fn request<T: RequestMessage>(
1366        &self,
1367        payload: T,
1368    ) -> impl 'static + Future<Output = Result<T::Response>> {
1369        self.request_internal(payload, true)
1370    }
1371
1372    fn request_internal<T: RequestMessage>(
1373        &self,
1374        payload: T,
1375        use_buffer: bool,
1376    ) -> impl 'static + Future<Output = Result<T::Response>> {
1377        log::debug!("ssh request start. name:{}", T::NAME);
1378        let response =
1379            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1380        async move {
1381            let response = response.await?;
1382            log::debug!("ssh request finish. name:{}", T::NAME);
1383            T::Response::from_envelope(response).context("received a response of the wrong type")
1384        }
1385    }
1386
1387    async fn resync(&self, timeout: Duration) -> Result<()> {
1388        smol::future::or(
1389            async {
1390                self.request_internal(proto::FlushBufferedMessages {}, false)
1391                    .await?;
1392
1393                for envelope in self.buffer.lock().iter() {
1394                    self.outgoing_tx
1395                        .lock()
1396                        .unbounded_send(envelope.clone())
1397                        .ok();
1398                }
1399                Ok(())
1400            },
1401            async {
1402                smol::Timer::after(timeout).await;
1403                anyhow::bail!("Timed out resyncing remote client")
1404            },
1405        )
1406        .await
1407    }
1408
1409    async fn ping(&self, timeout: Duration) -> Result<()> {
1410        smol::future::or(
1411            async {
1412                self.request(proto::Ping {}).await?;
1413                Ok(())
1414            },
1415            async {
1416                smol::Timer::after(timeout).await;
1417                anyhow::bail!("Timed out pinging remote client")
1418            },
1419        )
1420        .await
1421    }
1422
1423    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1424        log::debug!("ssh send name:{}", T::NAME);
1425        self.send_dynamic(payload.into_envelope(0, None, None))
1426    }
1427
1428    fn request_dynamic(
1429        &self,
1430        mut envelope: proto::Envelope,
1431        type_name: &'static str,
1432        use_buffer: bool,
1433    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1434        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1435        let (tx, rx) = oneshot::channel();
1436        let mut response_channels_lock = self.response_channels.lock();
1437        response_channels_lock.insert(MessageId(envelope.id), tx);
1438        drop(response_channels_lock);
1439
1440        let result = if use_buffer {
1441            self.send_buffered(envelope)
1442        } else {
1443            self.send_unbuffered(envelope)
1444        };
1445        async move {
1446            if let Err(error) = &result {
1447                log::error!("failed to send message: {error}");
1448                anyhow::bail!("failed to send message: {error}");
1449            }
1450
1451            let response = rx.await.context("connection lost")?.0;
1452            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1453                return Err(RpcError::from_proto(error, type_name));
1454            }
1455            Ok(response)
1456        }
1457    }
1458
1459    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1460        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1461        self.send_buffered(envelope)
1462    }
1463
1464    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1465        envelope.ack_id = Some(self.max_received.load(SeqCst));
1466        self.buffer.lock().push_back(envelope.clone());
1467        // ignore errors on send (happen while we're reconnecting)
1468        // assume that the global "disconnected" overlay is sufficient.
1469        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1470        Ok(())
1471    }
1472
1473    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1474        envelope.ack_id = Some(self.max_received.load(SeqCst));
1475        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1476        Ok(())
1477    }
1478}
1479
1480impl ProtoClient for ChannelClient {
1481    fn request(
1482        &self,
1483        envelope: proto::Envelope,
1484        request_type: &'static str,
1485    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1486        self.request_dynamic(envelope, request_type, true).boxed()
1487    }
1488
1489    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1490        self.send_dynamic(envelope)
1491    }
1492
1493    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1494        self.send_dynamic(envelope)
1495    }
1496
1497    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1498        &self.message_handlers
1499    }
1500
1501    fn is_via_collab(&self) -> bool {
1502        false
1503    }
1504
1505    fn has_wsl_interop(&self) -> bool {
1506        self.has_wsl_interop
1507    }
1508}
1509
1510#[cfg(any(test, feature = "test-support"))]
1511mod fake {
1512    use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1513    use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1514    use anyhow::Result;
1515    use askpass::EncryptedPassword;
1516    use async_trait::async_trait;
1517    use collections::HashMap;
1518    use futures::{
1519        FutureExt, SinkExt, StreamExt,
1520        channel::{
1521            mpsc::{self, Sender},
1522            oneshot,
1523        },
1524        select_biased,
1525    };
1526    use gpui::{App, AppContext as _, AsyncApp, Task, TestAppContext};
1527    use release_channel::ReleaseChannel;
1528    use rpc::proto::Envelope;
1529    use semver::Version;
1530    use std::{path::PathBuf, sync::Arc};
1531    use util::paths::{PathStyle, RemotePathBuf};
1532
1533    pub(super) struct FakeRemoteConnection {
1534        pub(super) connection_options: RemoteConnectionOptions,
1535        pub(super) server_channel: Arc<ChannelClient>,
1536        pub(super) server_cx: SendableCx,
1537    }
1538
1539    pub(super) struct SendableCx(AsyncApp);
1540    impl SendableCx {
1541        // SAFETY: When run in test mode, GPUI is always single threaded.
1542        pub(super) fn new(cx: &TestAppContext) -> Self {
1543            Self(cx.to_async())
1544        }
1545
1546        // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1547        fn get(&self, _: &AsyncApp) -> AsyncApp {
1548            self.0.clone()
1549        }
1550    }
1551
1552    // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1553    unsafe impl Send for SendableCx {}
1554    unsafe impl Sync for SendableCx {}
1555
1556    #[async_trait(?Send)]
1557    impl RemoteConnection for FakeRemoteConnection {
1558        async fn kill(&self) -> Result<()> {
1559            Ok(())
1560        }
1561
1562        fn has_been_killed(&self) -> bool {
1563            false
1564        }
1565
1566        fn build_command(
1567            &self,
1568            program: Option<String>,
1569            args: &[String],
1570            env: &HashMap<String, String>,
1571            _: Option<String>,
1572            _: Option<(u16, String, u16)>,
1573        ) -> Result<CommandTemplate> {
1574            let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1575            let mut ssh_args = Vec::new();
1576            ssh_args.push(ssh_program);
1577            ssh_args.extend(args.iter().cloned());
1578            Ok(CommandTemplate {
1579                program: "ssh".into(),
1580                args: ssh_args,
1581                env: env.clone(),
1582            })
1583        }
1584
1585        fn build_forward_ports_command(
1586            &self,
1587            forwards: Vec<(u16, String, u16)>,
1588        ) -> anyhow::Result<CommandTemplate> {
1589            Ok(CommandTemplate {
1590                program: "ssh".into(),
1591                args: std::iter::once("-N".to_owned())
1592                    .chain(forwards.into_iter().map(|(local_port, host, remote_port)| {
1593                        format!("{local_port}:{host}:{remote_port}")
1594                    }))
1595                    .collect(),
1596                env: Default::default(),
1597            })
1598        }
1599
1600        fn upload_directory(
1601            &self,
1602            _src_path: PathBuf,
1603            _dest_path: RemotePathBuf,
1604            _cx: &App,
1605        ) -> Task<Result<()>> {
1606            unreachable!()
1607        }
1608
1609        fn connection_options(&self) -> RemoteConnectionOptions {
1610            self.connection_options.clone()
1611        }
1612
1613        fn simulate_disconnect(&self, cx: &AsyncApp) {
1614            let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1615            let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1616            self.server_channel
1617                .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1618        }
1619
1620        fn start_proxy(
1621            &self,
1622            _unique_identifier: String,
1623            _reconnect: bool,
1624            mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1625            mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1626            mut connection_activity_tx: Sender<()>,
1627            _delegate: Arc<dyn RemoteClientDelegate>,
1628            cx: &mut AsyncApp,
1629        ) -> Task<Result<i32>> {
1630            let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1631            let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1632
1633            self.server_channel.reconnect(
1634                server_incoming_rx,
1635                server_outgoing_tx,
1636                &self.server_cx.get(cx),
1637            );
1638
1639            cx.background_spawn(async move {
1640                loop {
1641                    select_biased! {
1642                        server_to_client = server_outgoing_rx.next().fuse() => {
1643                            let Some(server_to_client) = server_to_client else {
1644                                return Ok(1)
1645                            };
1646                            connection_activity_tx.try_send(()).ok();
1647                            client_incoming_tx.send(server_to_client).await.ok();
1648                        }
1649                        client_to_server = client_outgoing_rx.next().fuse() => {
1650                            let Some(client_to_server) = client_to_server else {
1651                                return Ok(1)
1652                            };
1653                            server_incoming_tx.send(client_to_server).await.ok();
1654                        }
1655                    }
1656                }
1657            })
1658        }
1659
1660        fn path_style(&self) -> PathStyle {
1661            PathStyle::local()
1662        }
1663
1664        fn shell(&self) -> String {
1665            "sh".to_owned()
1666        }
1667
1668        fn default_system_shell(&self) -> String {
1669            "sh".to_owned()
1670        }
1671
1672        fn has_wsl_interop(&self) -> bool {
1673            false
1674        }
1675    }
1676
1677    pub(super) struct Delegate;
1678
1679    impl RemoteClientDelegate for Delegate {
1680        fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1681            unreachable!()
1682        }
1683
1684        fn download_server_binary_locally(
1685            &self,
1686            _: RemotePlatform,
1687            _: ReleaseChannel,
1688            _: Option<Version>,
1689            _: &mut AsyncApp,
1690        ) -> Task<Result<PathBuf>> {
1691            unreachable!()
1692        }
1693
1694        fn get_download_url(
1695            &self,
1696            _platform: RemotePlatform,
1697            _release_channel: ReleaseChannel,
1698            _version: Option<Version>,
1699            _cx: &mut AsyncApp,
1700        ) -> Task<Result<Option<String>>> {
1701            unreachable!()
1702        }
1703
1704        fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1705    }
1706}