remote_client.rs

   1use crate::{
   2    SshConnectionOptions,
   3    protocol::MessageId,
   4    proxy::ProxyLaunchError,
   5    transport::{
   6        docker::{DockerConnectionOptions, DockerExecConnection},
   7        ssh::SshRemoteConnection,
   8        wsl::{WslConnectionOptions, WslRemoteConnection},
   9    },
  10};
  11use anyhow::{Context as _, Result, anyhow};
  12use askpass::EncryptedPassword;
  13use async_trait::async_trait;
  14use collections::HashMap;
  15use futures::{
  16    Future, FutureExt as _, StreamExt as _,
  17    channel::{
  18        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  19        oneshot,
  20    },
  21    future::{BoxFuture, Shared},
  22    select, select_biased,
  23};
  24use gpui::{
  25    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  26    EventEmitter, FutureExt, Global, Task, WeakEntity,
  27};
  28use parking_lot::Mutex;
  29
  30use release_channel::ReleaseChannel;
  31use rpc::{
  32    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  33    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  34};
  35use semver::Version;
  36use std::{
  37    collections::VecDeque,
  38    fmt,
  39    ops::ControlFlow,
  40    path::PathBuf,
  41    sync::{
  42        Arc, Weak,
  43        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  44    },
  45    time::{Duration, Instant},
  46};
  47use util::{
  48    ResultExt,
  49    paths::{PathStyle, RemotePathBuf},
  50};
  51
  52#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  53pub enum RemoteOs {
  54    Linux,
  55    MacOs,
  56    Windows,
  57}
  58
  59impl RemoteOs {
  60    pub fn as_str(&self) -> &'static str {
  61        match self {
  62            RemoteOs::Linux => "linux",
  63            RemoteOs::MacOs => "macos",
  64            RemoteOs::Windows => "windows",
  65        }
  66    }
  67
  68    pub fn is_windows(&self) -> bool {
  69        matches!(self, RemoteOs::Windows)
  70    }
  71}
  72
  73impl std::fmt::Display for RemoteOs {
  74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  75        f.write_str(self.as_str())
  76    }
  77}
  78
  79#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  80pub enum RemoteArch {
  81    X86_64,
  82    Aarch64,
  83}
  84
  85impl RemoteArch {
  86    pub fn as_str(&self) -> &'static str {
  87        match self {
  88            RemoteArch::X86_64 => "x86_64",
  89            RemoteArch::Aarch64 => "aarch64",
  90        }
  91    }
  92}
  93
  94impl std::fmt::Display for RemoteArch {
  95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  96        f.write_str(self.as_str())
  97    }
  98}
  99
 100#[derive(Copy, Clone, Debug)]
 101pub struct RemotePlatform {
 102    pub os: RemoteOs,
 103    pub arch: RemoteArch,
 104}
 105
 106#[derive(Clone, Debug)]
 107pub struct CommandTemplate {
 108    pub program: String,
 109    pub args: Vec<String>,
 110    pub env: HashMap<String, String>,
 111}
 112
 113pub trait RemoteClientDelegate: Send + Sync {
 114    fn ask_password(
 115        &self,
 116        prompt: String,
 117        tx: oneshot::Sender<EncryptedPassword>,
 118        cx: &mut AsyncApp,
 119    );
 120    fn get_download_url(
 121        &self,
 122        platform: RemotePlatform,
 123        release_channel: ReleaseChannel,
 124        version: Option<Version>,
 125        cx: &mut AsyncApp,
 126    ) -> Task<Result<Option<String>>>;
 127    fn download_server_binary_locally(
 128        &self,
 129        platform: RemotePlatform,
 130        release_channel: ReleaseChannel,
 131        version: Option<Version>,
 132        cx: &mut AsyncApp,
 133    ) -> Task<Result<PathBuf>>;
 134    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
 135}
 136
 137const MAX_MISSED_HEARTBEATS: usize = 5;
 138const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
 139const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
 140const INITIAL_CONNECTION_TIMEOUT: Duration =
 141    Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
 142
 143const MAX_RECONNECT_ATTEMPTS: usize = 3;
 144
 145enum State {
 146    Connecting,
 147    Connected {
 148        remote_connection: Arc<dyn RemoteConnection>,
 149        delegate: Arc<dyn RemoteClientDelegate>,
 150
 151        multiplex_task: Task<Result<()>>,
 152        heartbeat_task: Task<Result<()>>,
 153    },
 154    HeartbeatMissed {
 155        missed_heartbeats: usize,
 156
 157        ssh_connection: Arc<dyn RemoteConnection>,
 158        delegate: Arc<dyn RemoteClientDelegate>,
 159
 160        multiplex_task: Task<Result<()>>,
 161        heartbeat_task: Task<Result<()>>,
 162    },
 163    Reconnecting,
 164    ReconnectFailed {
 165        ssh_connection: Arc<dyn RemoteConnection>,
 166        delegate: Arc<dyn RemoteClientDelegate>,
 167
 168        error: anyhow::Error,
 169        attempts: usize,
 170    },
 171    ReconnectExhausted,
 172    ServerNotRunning,
 173}
 174
 175impl fmt::Display for State {
 176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 177        match self {
 178            Self::Connecting => write!(f, "connecting"),
 179            Self::Connected { .. } => write!(f, "connected"),
 180            Self::Reconnecting => write!(f, "reconnecting"),
 181            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 182            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 183            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 184            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 185        }
 186    }
 187}
 188
 189impl State {
 190    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 191        match self {
 192            Self::Connected {
 193                remote_connection: ssh_connection,
 194                ..
 195            } => Some(ssh_connection.clone()),
 196            Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.clone()),
 197            Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.clone()),
 198            _ => None,
 199        }
 200    }
 201
 202    fn can_reconnect(&self) -> bool {
 203        match self {
 204            Self::Connected { .. }
 205            | Self::HeartbeatMissed { .. }
 206            | Self::ReconnectFailed { .. } => true,
 207            State::Connecting
 208            | State::Reconnecting
 209            | State::ReconnectExhausted
 210            | State::ServerNotRunning => false,
 211        }
 212    }
 213
 214    fn is_reconnect_failed(&self) -> bool {
 215        matches!(self, Self::ReconnectFailed { .. })
 216    }
 217
 218    fn is_reconnect_exhausted(&self) -> bool {
 219        matches!(self, Self::ReconnectExhausted { .. })
 220    }
 221
 222    fn is_server_not_running(&self) -> bool {
 223        matches!(self, Self::ServerNotRunning)
 224    }
 225
 226    fn is_reconnecting(&self) -> bool {
 227        matches!(self, Self::Reconnecting { .. })
 228    }
 229
 230    fn heartbeat_recovered(self) -> Self {
 231        match self {
 232            Self::HeartbeatMissed {
 233                ssh_connection,
 234                delegate,
 235                multiplex_task,
 236                heartbeat_task,
 237                ..
 238            } => Self::Connected {
 239                remote_connection: ssh_connection,
 240                delegate,
 241                multiplex_task,
 242                heartbeat_task,
 243            },
 244            _ => self,
 245        }
 246    }
 247
 248    fn heartbeat_missed(self) -> Self {
 249        match self {
 250            Self::Connected {
 251                remote_connection: ssh_connection,
 252                delegate,
 253                multiplex_task,
 254                heartbeat_task,
 255            } => Self::HeartbeatMissed {
 256                missed_heartbeats: 1,
 257                ssh_connection,
 258                delegate,
 259                multiplex_task,
 260                heartbeat_task,
 261            },
 262            Self::HeartbeatMissed {
 263                missed_heartbeats,
 264                ssh_connection,
 265                delegate,
 266                multiplex_task,
 267                heartbeat_task,
 268            } => Self::HeartbeatMissed {
 269                missed_heartbeats: missed_heartbeats + 1,
 270                ssh_connection,
 271                delegate,
 272                multiplex_task,
 273                heartbeat_task,
 274            },
 275            _ => self,
 276        }
 277    }
 278}
 279
 280/// The state of the ssh connection.
 281#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 282pub enum ConnectionState {
 283    Connecting,
 284    Connected,
 285    HeartbeatMissed,
 286    Reconnecting,
 287    Disconnected,
 288}
 289
 290impl From<&State> for ConnectionState {
 291    fn from(value: &State) -> Self {
 292        match value {
 293            State::Connecting => Self::Connecting,
 294            State::Connected { .. } => Self::Connected,
 295            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 296            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 297            State::ReconnectExhausted => Self::Disconnected,
 298            State::ServerNotRunning => Self::Disconnected,
 299        }
 300    }
 301}
 302
 303pub struct RemoteClient {
 304    client: Arc<ChannelClient>,
 305    unique_identifier: String,
 306    connection_options: RemoteConnectionOptions,
 307    path_style: PathStyle,
 308    state: Option<State>,
 309}
 310
 311#[derive(Debug)]
 312pub enum RemoteClientEvent {
 313    Disconnected,
 314}
 315
 316impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 317
 318/// Identifies the socket on the remote server so that reconnects
 319/// can re-join the same project.
 320pub enum ConnectionIdentifier {
 321    Setup(u64),
 322    Workspace(i64),
 323}
 324
 325static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 326
 327impl ConnectionIdentifier {
 328    pub fn setup() -> Self {
 329        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 330    }
 331
 332    // This string gets used in a socket name, and so must be relatively short.
 333    // The total length of:
 334    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 335    // Must be less than about 100 characters
 336    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 337    // So our strings should be at most 20 characters or so.
 338    fn to_string(&self, cx: &App) -> String {
 339        let identifier_prefix = match ReleaseChannel::global(cx) {
 340            ReleaseChannel::Stable => "".to_string(),
 341            release_channel => format!("{}-", release_channel.dev_name()),
 342        };
 343        match self {
 344            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 345            Self::Workspace(workspace_id) => {
 346                format!("{identifier_prefix}workspace-{workspace_id}",)
 347            }
 348        }
 349    }
 350}
 351
 352pub async fn connect(
 353    connection_options: RemoteConnectionOptions,
 354    delegate: Arc<dyn RemoteClientDelegate>,
 355    cx: &mut AsyncApp,
 356) -> Result<Arc<dyn RemoteConnection>> {
 357    cx.update(|cx| {
 358        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 359            pool.connect(connection_options.clone(), delegate.clone(), cx)
 360        })
 361    })?
 362    .await
 363    .map_err(|e| e.cloned())
 364}
 365
 366impl RemoteClient {
 367    pub fn new(
 368        unique_identifier: ConnectionIdentifier,
 369        remote_connection: Arc<dyn RemoteConnection>,
 370        cancellation: oneshot::Receiver<()>,
 371        delegate: Arc<dyn RemoteClientDelegate>,
 372        cx: &mut App,
 373    ) -> Task<Result<Option<Entity<Self>>>> {
 374        let unique_identifier = unique_identifier.to_string(cx);
 375        cx.spawn(async move |cx| {
 376            let success = Box::pin(async move {
 377                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 378                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 379                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 380
 381                let client = cx.update(|cx| {
 382                    ChannelClient::new(
 383                        incoming_rx,
 384                        outgoing_tx,
 385                        cx,
 386                        "client",
 387                        remote_connection.has_wsl_interop(),
 388                    )
 389                })?;
 390
 391                let path_style = remote_connection.path_style();
 392                let this = cx.new(|_| Self {
 393                    client: client.clone(),
 394                    unique_identifier: unique_identifier.clone(),
 395                    connection_options: remote_connection.connection_options(),
 396                    path_style,
 397                    state: Some(State::Connecting),
 398                })?;
 399
 400                let io_task = remote_connection.start_proxy(
 401                    unique_identifier,
 402                    false,
 403                    incoming_tx,
 404                    outgoing_rx,
 405                    connection_activity_tx,
 406                    delegate.clone(),
 407                    cx,
 408                );
 409
 410                let ready = client
 411                    .wait_for_remote_started()
 412                    .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
 413                    .await;
 414                match ready {
 415                    Ok(Some(_)) => {}
 416                    Ok(None) => {
 417                        let mut error = "remote client exited before becoming ready".to_owned();
 418                        if let Some(status) = io_task.now_or_never() {
 419                            match status {
 420                                Ok(exit_code) => {
 421                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 422                                }
 423                                Err(e) => error.push_str(&format!(", error={e:?}")),
 424                            }
 425                        }
 426                        let error = anyhow::anyhow!("{error}");
 427                        log::error!("failed to establish connection: {}", error);
 428                        return Err(error);
 429                    }
 430                    Err(_) => {
 431                        let mut error =
 432                            "remote client did not become ready within the timeout".to_owned();
 433                        if let Some(status) = io_task.now_or_never() {
 434                            match status {
 435                                Ok(exit_code) => {
 436                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 437                                }
 438                                Err(e) => error.push_str(&format!(", error={e:?}")),
 439                            }
 440                        }
 441                        let error = anyhow::anyhow!("{error}");
 442                        log::error!("failed to establish connection: {}", error);
 443                        return Err(error);
 444                    }
 445                }
 446                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 447                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 448                    log::error!("failed to establish connection: {}", error);
 449                    return Err(error);
 450                }
 451
 452                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 453
 454                this.update(cx, |this, _| {
 455                    this.state = Some(State::Connected {
 456                        remote_connection,
 457                        delegate,
 458                        multiplex_task,
 459                        heartbeat_task,
 460                    });
 461                })?;
 462
 463                Ok(Some(this))
 464            });
 465
 466            select! {
 467                _ = cancellation.fuse() => {
 468                    Ok(None)
 469                }
 470                result = success.fuse() =>  result
 471            }
 472        })
 473    }
 474
 475    pub fn proto_client_from_channels(
 476        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 477        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 478        cx: &App,
 479        name: &'static str,
 480        has_wsl_interop: bool,
 481    ) -> AnyProtoClient {
 482        ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
 483    }
 484
 485    pub fn shutdown_processes<T: RequestMessage>(
 486        &mut self,
 487        shutdown_request: Option<T>,
 488        executor: BackgroundExecutor,
 489    ) -> Option<impl Future<Output = ()> + use<T>> {
 490        let state = self.state.take()?;
 491        log::info!("shutting down ssh processes");
 492
 493        let State::Connected {
 494            multiplex_task,
 495            heartbeat_task,
 496            remote_connection: ssh_connection,
 497            delegate,
 498        } = state
 499        else {
 500            return None;
 501        };
 502
 503        let client = self.client.clone();
 504
 505        Some(async move {
 506            if let Some(shutdown_request) = shutdown_request {
 507                client.send(shutdown_request).log_err();
 508                // We wait 50ms instead of waiting for a response, because
 509                // waiting for a response would require us to wait on the main thread
 510                // which we want to avoid in an `on_app_quit` callback.
 511                executor.timer(Duration::from_millis(50)).await;
 512            }
 513
 514            // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
 515            // child of master_process.
 516            drop(multiplex_task);
 517            // Now drop the rest of state, which kills master process.
 518            drop(heartbeat_task);
 519            drop(ssh_connection);
 520            drop(delegate);
 521        })
 522    }
 523
 524    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 525        let can_reconnect = self
 526            .state
 527            .as_ref()
 528            .map(|state| state.can_reconnect())
 529            .unwrap_or(false);
 530        if !can_reconnect {
 531            log::info!("aborting reconnect, because not in state that allows reconnecting");
 532            let error = if let Some(state) = self.state.as_ref() {
 533                format!("invalid state, cannot reconnect while in state {state}")
 534            } else {
 535                "no state set".to_string()
 536            };
 537            anyhow::bail!(error);
 538        }
 539
 540        let state = self.state.take().unwrap();
 541        let (attempts, remote_connection, delegate) = match state {
 542            State::Connected {
 543                remote_connection: ssh_connection,
 544                delegate,
 545                multiplex_task,
 546                heartbeat_task,
 547            }
 548            | State::HeartbeatMissed {
 549                ssh_connection,
 550                delegate,
 551                multiplex_task,
 552                heartbeat_task,
 553                ..
 554            } => {
 555                drop(multiplex_task);
 556                drop(heartbeat_task);
 557                (0, ssh_connection, delegate)
 558            }
 559            State::ReconnectFailed {
 560                attempts,
 561                ssh_connection,
 562                delegate,
 563                ..
 564            } => (attempts, ssh_connection, delegate),
 565            State::Connecting
 566            | State::Reconnecting
 567            | State::ReconnectExhausted
 568            | State::ServerNotRunning => unreachable!(),
 569        };
 570
 571        let attempts = attempts + 1;
 572        if attempts > MAX_RECONNECT_ATTEMPTS {
 573            log::error!(
 574                "Failed to reconnect to after {} attempts, giving up",
 575                MAX_RECONNECT_ATTEMPTS
 576            );
 577            self.set_state(State::ReconnectExhausted, cx);
 578            return Ok(());
 579        }
 580
 581        self.set_state(State::Reconnecting, cx);
 582
 583        log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
 584
 585        let unique_identifier = self.unique_identifier.clone();
 586        let client = self.client.clone();
 587        let reconnect_task = cx.spawn(async move |this, cx| {
 588            macro_rules! failed {
 589                ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr) => {
 590                    delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
 591                    return State::ReconnectFailed {
 592                        error: anyhow!($error),
 593                        attempts: $attempts,
 594                        ssh_connection: $ssh_connection,
 595                        delegate: $delegate,
 596                    };
 597                };
 598            }
 599
 600            if let Err(error) = remote_connection
 601                .kill()
 602                .await
 603                .context("Failed to kill ssh process")
 604            {
 605                failed!(error, attempts, remote_connection, delegate);
 606            };
 607
 608            let connection_options = remote_connection.connection_options();
 609
 610            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 611            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 612            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 613
 614            let (ssh_connection, io_task) = match async {
 615                let ssh_connection = cx
 616                    .update_global(|pool: &mut ConnectionPool, cx| {
 617                        pool.connect(connection_options, delegate.clone(), cx)
 618                    })?
 619                    .await
 620                    .map_err(|error| error.cloned())?;
 621
 622                let io_task = ssh_connection.start_proxy(
 623                    unique_identifier,
 624                    true,
 625                    incoming_tx,
 626                    outgoing_rx,
 627                    connection_activity_tx,
 628                    delegate.clone(),
 629                    cx,
 630                );
 631                anyhow::Ok((ssh_connection, io_task))
 632            }
 633            .await
 634            {
 635                Ok((ssh_connection, io_task)) => (ssh_connection, io_task),
 636                Err(error) => {
 637                    failed!(error, attempts, remote_connection, delegate);
 638                }
 639            };
 640
 641            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 642            client.reconnect(incoming_rx, outgoing_tx, cx);
 643
 644            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 645                failed!(error, attempts, ssh_connection, delegate);
 646            };
 647
 648            State::Connected {
 649                remote_connection: ssh_connection,
 650                delegate,
 651                multiplex_task,
 652                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 653            }
 654        });
 655
 656        cx.spawn(async move |this, cx| {
 657            let new_state = reconnect_task.await;
 658            this.update(cx, |this, cx| {
 659                this.try_set_state(cx, |old_state| {
 660                    if old_state.is_reconnecting() {
 661                        match &new_state {
 662                            State::Connecting
 663                            | State::Reconnecting
 664                            | State::HeartbeatMissed { .. }
 665                            | State::ServerNotRunning => {}
 666                            State::Connected { .. } => {
 667                                log::info!("Successfully reconnected");
 668                            }
 669                            State::ReconnectFailed {
 670                                error, attempts, ..
 671                            } => {
 672                                log::error!(
 673                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 674                                    attempts,
 675                                    error
 676                                );
 677                            }
 678                            State::ReconnectExhausted => {
 679                                log::error!("Reconnect attempt failed and all attempts exhausted");
 680                            }
 681                        }
 682                        Some(new_state)
 683                    } else {
 684                        None
 685                    }
 686                });
 687
 688                if this.state_is(State::is_reconnect_failed) {
 689                    this.reconnect(cx)
 690                } else if this.state_is(State::is_reconnect_exhausted) {
 691                    Ok(())
 692                } else {
 693                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 694                    Ok(())
 695                }
 696            })
 697        })
 698        .detach_and_log_err(cx);
 699
 700        Ok(())
 701    }
 702
 703    fn heartbeat(
 704        this: WeakEntity<Self>,
 705        mut connection_activity_rx: mpsc::Receiver<()>,
 706        cx: &mut AsyncApp,
 707    ) -> Task<Result<()>> {
 708        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 709            return Task::ready(Err(anyhow!("SshRemoteClient lost")));
 710        };
 711
 712        cx.spawn(async move |cx| {
 713            let mut missed_heartbeats = 0;
 714
 715            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 716            futures::pin_mut!(keepalive_timer);
 717
 718            loop {
 719                select_biased! {
 720                    result = connection_activity_rx.next().fuse() => {
 721                        if result.is_none() {
 722                            log::warn!("ssh heartbeat: connection activity channel has been dropped. stopping.");
 723                            return Ok(());
 724                        }
 725
 726                        if missed_heartbeats != 0 {
 727                            missed_heartbeats = 0;
 728                            let _ =this.update(cx, |this, cx| {
 729                                this.handle_heartbeat_result(missed_heartbeats, cx)
 730                            })?;
 731                        }
 732                    }
 733                    _ = keepalive_timer => {
 734                        log::debug!("Sending heartbeat to server...");
 735
 736                        let result = select_biased! {
 737                            _ = connection_activity_rx.next().fuse() => {
 738                                Ok(())
 739                            }
 740                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 741                                ping_result
 742                            }
 743                        };
 744
 745                        if result.is_err() {
 746                            missed_heartbeats += 1;
 747                            log::warn!(
 748                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 749                                HEARTBEAT_TIMEOUT,
 750                                missed_heartbeats,
 751                                MAX_MISSED_HEARTBEATS
 752                            );
 753                        } else if missed_heartbeats != 0 {
 754                            missed_heartbeats = 0;
 755                        } else {
 756                            continue;
 757                        }
 758
 759                        let result = this.update(cx, |this, cx| {
 760                            this.handle_heartbeat_result(missed_heartbeats, cx)
 761                        })?;
 762                        if result.is_break() {
 763                            return Ok(());
 764                        }
 765                    }
 766                }
 767
 768                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 769            }
 770        })
 771    }
 772
 773    fn handle_heartbeat_result(
 774        &mut self,
 775        missed_heartbeats: usize,
 776        cx: &mut Context<Self>,
 777    ) -> ControlFlow<()> {
 778        let state = self.state.take().unwrap();
 779        let next_state = if missed_heartbeats > 0 {
 780            state.heartbeat_missed()
 781        } else {
 782            state.heartbeat_recovered()
 783        };
 784
 785        self.set_state(next_state, cx);
 786
 787        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 788            log::error!(
 789                "Missed last {} heartbeats. Reconnecting...",
 790                missed_heartbeats
 791            );
 792
 793            self.reconnect(cx)
 794                .context("failed to start reconnect process after missing heartbeats")
 795                .log_err();
 796            ControlFlow::Break(())
 797        } else {
 798            ControlFlow::Continue(())
 799        }
 800    }
 801
 802    fn monitor(
 803        this: WeakEntity<Self>,
 804        io_task: Task<Result<i32>>,
 805        cx: &AsyncApp,
 806    ) -> Task<Result<()>> {
 807        cx.spawn(async move |cx| {
 808            let result = io_task.await;
 809
 810            match result {
 811                Ok(exit_code) => {
 812                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 813                        match error {
 814                            ProxyLaunchError::ServerNotRunning => {
 815                                log::error!("failed to reconnect because server is not running");
 816                                this.update(cx, |this, cx| {
 817                                    this.set_state(State::ServerNotRunning, cx);
 818                                })?;
 819                            }
 820                        }
 821                    } else if exit_code > 0 {
 822                        log::error!("proxy process terminated unexpectedly");
 823                        this.update(cx, |this, cx| {
 824                            this.reconnect(cx).ok();
 825                        })?;
 826                    }
 827                }
 828                Err(error) => {
 829                    log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
 830                    this.update(cx, |this, cx| {
 831                        this.reconnect(cx).ok();
 832                    })?;
 833                }
 834            }
 835
 836            Ok(())
 837        })
 838    }
 839
 840    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 841        self.state.as_ref().is_some_and(check)
 842    }
 843
 844    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 845        let new_state = self.state.as_ref().and_then(map);
 846        if let Some(new_state) = new_state {
 847            self.state.replace(new_state);
 848            cx.notify();
 849        }
 850    }
 851
 852    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 853        log::info!("setting state to '{}'", &state);
 854
 855        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 856        let is_server_not_running = state.is_server_not_running();
 857        self.state.replace(state);
 858
 859        if is_reconnect_exhausted || is_server_not_running {
 860            cx.emit(RemoteClientEvent::Disconnected);
 861        }
 862        cx.notify();
 863    }
 864
 865    pub fn shell(&self) -> Option<String> {
 866        Some(self.remote_connection()?.shell())
 867    }
 868
 869    pub fn default_system_shell(&self) -> Option<String> {
 870        Some(self.remote_connection()?.default_system_shell())
 871    }
 872
 873    pub fn shares_network_interface(&self) -> bool {
 874        self.remote_connection()
 875            .map_or(false, |connection| connection.shares_network_interface())
 876    }
 877
 878    pub fn build_command(
 879        &self,
 880        program: Option<String>,
 881        args: &[String],
 882        env: &HashMap<String, String>,
 883        working_dir: Option<String>,
 884        port_forward: Option<(u16, String, u16)>,
 885    ) -> Result<CommandTemplate> {
 886        let Some(connection) = self.remote_connection() else {
 887            return Err(anyhow!("no ssh connection"));
 888        };
 889        connection.build_command(program, args, env, working_dir, port_forward)
 890    }
 891
 892    pub fn build_forward_ports_command(
 893        &self,
 894        forwards: Vec<(u16, String, u16)>,
 895    ) -> Result<CommandTemplate> {
 896        let Some(connection) = self.remote_connection() else {
 897            return Err(anyhow!("no ssh connection"));
 898        };
 899        connection.build_forward_ports_command(forwards)
 900    }
 901
 902    pub fn upload_directory(
 903        &self,
 904        src_path: PathBuf,
 905        dest_path: RemotePathBuf,
 906        cx: &App,
 907    ) -> Task<Result<()>> {
 908        let Some(connection) = self.remote_connection() else {
 909            return Task::ready(Err(anyhow!("no ssh connection")));
 910        };
 911        connection.upload_directory(src_path, dest_path, cx)
 912    }
 913
 914    pub fn proto_client(&self) -> AnyProtoClient {
 915        self.client.clone().into()
 916    }
 917
 918    pub fn connection_options(&self) -> RemoteConnectionOptions {
 919        self.connection_options.clone()
 920    }
 921
 922    pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 923        if let State::Connected {
 924            remote_connection, ..
 925        } = self.state.as_ref()?
 926        {
 927            Some(remote_connection.clone())
 928        } else {
 929            None
 930        }
 931    }
 932
 933    pub fn connection_state(&self) -> ConnectionState {
 934        self.state
 935            .as_ref()
 936            .map(ConnectionState::from)
 937            .unwrap_or(ConnectionState::Disconnected)
 938    }
 939
 940    pub fn is_disconnected(&self) -> bool {
 941        self.connection_state() == ConnectionState::Disconnected
 942    }
 943
 944    pub fn path_style(&self) -> PathStyle {
 945        self.path_style
 946    }
 947
 948    #[cfg(any(test, feature = "test-support"))]
 949    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
 950        let opts = self.connection_options();
 951        client_cx.spawn(async move |cx| {
 952            let connection = cx
 953                .update_global(|c: &mut ConnectionPool, _| {
 954                    if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
 955                        c.clone()
 956                    } else {
 957                        panic!("missing test connection")
 958                    }
 959                })
 960                .unwrap()
 961                .await
 962                .unwrap();
 963
 964            connection.simulate_disconnect(cx);
 965        })
 966    }
 967
 968    #[cfg(any(test, feature = "test-support"))]
 969    pub fn fake_server(
 970        client_cx: &mut gpui::TestAppContext,
 971        server_cx: &mut gpui::TestAppContext,
 972    ) -> (RemoteConnectionOptions, AnyProtoClient) {
 973        use crate::transport::ssh::SshConnectionHost;
 974
 975        let port = client_cx
 976            .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
 977        let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
 978            host: SshConnectionHost::from("<fake>".to_string()),
 979            port: Some(port),
 980            ..Default::default()
 981        });
 982        let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
 983        let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
 984        let server_client = server_cx
 985            .update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server", false));
 986        let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
 987            connection_options: opts.clone(),
 988            server_cx: fake::SendableCx::new(server_cx),
 989            server_channel: server_client.clone(),
 990        });
 991
 992        client_cx.update(|cx| {
 993            cx.update_default_global(|c: &mut ConnectionPool, cx| {
 994                c.connections.insert(
 995                    opts.clone(),
 996                    ConnectionPoolEntry::Connecting(
 997                        cx.background_spawn({
 998                            let connection = connection.clone();
 999                            async move { Ok(connection.clone()) }
1000                        })
1001                        .shared(),
1002                    ),
1003                );
1004            })
1005        });
1006
1007        (opts, server_client.into())
1008    }
1009
1010    #[cfg(any(test, feature = "test-support"))]
1011    pub async fn fake_client(
1012        opts: RemoteConnectionOptions,
1013        client_cx: &mut gpui::TestAppContext,
1014    ) -> Entity<Self> {
1015        let (_tx, rx) = oneshot::channel();
1016        let mut cx = client_cx.to_async();
1017        let connection = connect(opts, Arc::new(fake::Delegate), &mut cx)
1018            .await
1019            .unwrap();
1020        client_cx
1021            .update(|cx| {
1022                Self::new(
1023                    ConnectionIdentifier::setup(),
1024                    connection,
1025                    rx,
1026                    Arc::new(fake::Delegate),
1027                    cx,
1028                )
1029            })
1030            .await
1031            .unwrap()
1032            .unwrap()
1033    }
1034
1035    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1036        self.state
1037            .as_ref()
1038            .and_then(|state| state.remote_connection())
1039    }
1040}
1041
1042enum ConnectionPoolEntry {
1043    Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1044    Connected(Weak<dyn RemoteConnection>),
1045}
1046
1047#[derive(Default)]
1048struct ConnectionPool {
1049    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1050}
1051
1052impl Global for ConnectionPool {}
1053
1054impl ConnectionPool {
1055    pub fn connect(
1056        &mut self,
1057        opts: RemoteConnectionOptions,
1058        delegate: Arc<dyn RemoteClientDelegate>,
1059        cx: &mut App,
1060    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1061        let connection = self.connections.get(&opts);
1062        match connection {
1063            Some(ConnectionPoolEntry::Connecting(task)) => {
1064                delegate.set_status(
1065                    Some("Waiting for existing connection attempt"),
1066                    &mut cx.to_async(),
1067                );
1068                return task.clone();
1069            }
1070            Some(ConnectionPoolEntry::Connected(ssh)) => {
1071                if let Some(ssh) = ssh.upgrade()
1072                    && !ssh.has_been_killed()
1073                {
1074                    return Task::ready(Ok(ssh)).shared();
1075                }
1076                self.connections.remove(&opts);
1077            }
1078            None => {}
1079        }
1080
1081        let task = cx
1082            .spawn({
1083                let opts = opts.clone();
1084                let delegate = delegate.clone();
1085                async move |cx| {
1086                    let connection = match opts.clone() {
1087                        RemoteConnectionOptions::Ssh(opts) => {
1088                            SshRemoteConnection::new(opts, delegate, cx)
1089                                .await
1090                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1091                        }
1092                        RemoteConnectionOptions::Wsl(opts) => {
1093                            WslRemoteConnection::new(opts, delegate, cx)
1094                                .await
1095                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1096                        }
1097                        RemoteConnectionOptions::Docker(opts) => {
1098                            DockerExecConnection::new(opts, delegate, cx)
1099                                .await
1100                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1101                        }
1102                    };
1103
1104                    cx.update_global(|pool: &mut Self, _| {
1105                        debug_assert!(matches!(
1106                            pool.connections.get(&opts),
1107                            Some(ConnectionPoolEntry::Connecting(_))
1108                        ));
1109                        match connection {
1110                            Ok(connection) => {
1111                                pool.connections.insert(
1112                                    opts.clone(),
1113                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1114                                );
1115                                Ok(connection)
1116                            }
1117                            Err(error) => {
1118                                pool.connections.remove(&opts);
1119                                Err(Arc::new(error))
1120                            }
1121                        }
1122                    })?
1123                }
1124            })
1125            .shared();
1126
1127        self.connections
1128            .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1129        task
1130    }
1131}
1132
1133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1134pub enum RemoteConnectionOptions {
1135    Ssh(SshConnectionOptions),
1136    Wsl(WslConnectionOptions),
1137    Docker(DockerConnectionOptions),
1138}
1139
1140impl RemoteConnectionOptions {
1141    pub fn display_name(&self) -> String {
1142        match self {
1143            RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1144            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1145            RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1146        }
1147    }
1148}
1149
1150impl From<SshConnectionOptions> for RemoteConnectionOptions {
1151    fn from(opts: SshConnectionOptions) -> Self {
1152        RemoteConnectionOptions::Ssh(opts)
1153    }
1154}
1155
1156impl From<WslConnectionOptions> for RemoteConnectionOptions {
1157    fn from(opts: WslConnectionOptions) -> Self {
1158        RemoteConnectionOptions::Wsl(opts)
1159    }
1160}
1161
1162#[cfg(target_os = "windows")]
1163/// Open a wsl path (\\wsl.localhost\<distro>\path)
1164#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1165#[action(namespace = workspace, no_json, no_register)]
1166pub struct OpenWslPath {
1167    pub distro: WslConnectionOptions,
1168    pub paths: Vec<PathBuf>,
1169}
1170
1171#[async_trait(?Send)]
1172pub trait RemoteConnection: Send + Sync {
1173    fn start_proxy(
1174        &self,
1175        unique_identifier: String,
1176        reconnect: bool,
1177        incoming_tx: UnboundedSender<Envelope>,
1178        outgoing_rx: UnboundedReceiver<Envelope>,
1179        connection_activity_tx: Sender<()>,
1180        delegate: Arc<dyn RemoteClientDelegate>,
1181        cx: &mut AsyncApp,
1182    ) -> Task<Result<i32>>;
1183    fn upload_directory(
1184        &self,
1185        src_path: PathBuf,
1186        dest_path: RemotePathBuf,
1187        cx: &App,
1188    ) -> Task<Result<()>>;
1189    async fn kill(&self) -> Result<()>;
1190    fn has_been_killed(&self) -> bool;
1191    fn shares_network_interface(&self) -> bool {
1192        false
1193    }
1194    fn build_command(
1195        &self,
1196        program: Option<String>,
1197        args: &[String],
1198        env: &HashMap<String, String>,
1199        working_dir: Option<String>,
1200        port_forward: Option<(u16, String, u16)>,
1201    ) -> Result<CommandTemplate>;
1202    fn build_forward_ports_command(
1203        &self,
1204        forwards: Vec<(u16, String, u16)>,
1205    ) -> Result<CommandTemplate>;
1206    fn connection_options(&self) -> RemoteConnectionOptions;
1207    fn path_style(&self) -> PathStyle;
1208    fn shell(&self) -> String;
1209    fn default_system_shell(&self) -> String;
1210    fn has_wsl_interop(&self) -> bool;
1211
1212    #[cfg(any(test, feature = "test-support"))]
1213    fn simulate_disconnect(&self, _: &AsyncApp) {}
1214}
1215
1216type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1217
1218struct Signal<T> {
1219    tx: Mutex<Option<oneshot::Sender<T>>>,
1220    rx: Shared<Task<Option<T>>>,
1221}
1222
1223impl<T: Send + Clone + 'static> Signal<T> {
1224    pub fn new(cx: &App) -> Self {
1225        let (tx, rx) = oneshot::channel();
1226
1227        let task = cx
1228            .background_executor()
1229            .spawn(async move { rx.await.ok() })
1230            .shared();
1231
1232        Self {
1233            tx: Mutex::new(Some(tx)),
1234            rx: task,
1235        }
1236    }
1237
1238    fn set(&self, value: T) {
1239        if let Some(tx) = self.tx.lock().take() {
1240            let _ = tx.send(value);
1241        }
1242    }
1243
1244    fn wait(&self) -> Shared<Task<Option<T>>> {
1245        self.rx.clone()
1246    }
1247}
1248
1249struct ChannelClient {
1250    next_message_id: AtomicU32,
1251    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1252    buffer: Mutex<VecDeque<Envelope>>,
1253    response_channels: ResponseChannels,
1254    message_handlers: Mutex<ProtoMessageHandlerSet>,
1255    max_received: AtomicU32,
1256    name: &'static str,
1257    task: Mutex<Task<Result<()>>>,
1258    remote_started: Signal<()>,
1259    has_wsl_interop: bool,
1260}
1261
1262impl ChannelClient {
1263    fn new(
1264        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1265        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1266        cx: &App,
1267        name: &'static str,
1268        has_wsl_interop: bool,
1269    ) -> Arc<Self> {
1270        Arc::new_cyclic(|this| Self {
1271            outgoing_tx: Mutex::new(outgoing_tx),
1272            next_message_id: AtomicU32::new(0),
1273            max_received: AtomicU32::new(0),
1274            response_channels: ResponseChannels::default(),
1275            message_handlers: Default::default(),
1276            buffer: Mutex::new(VecDeque::new()),
1277            name,
1278            task: Mutex::new(Self::start_handling_messages(
1279                this.clone(),
1280                incoming_rx,
1281                &cx.to_async(),
1282            )),
1283            remote_started: Signal::new(cx),
1284            has_wsl_interop,
1285        })
1286    }
1287
1288    fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1289        self.remote_started.wait()
1290    }
1291
1292    fn start_handling_messages(
1293        this: Weak<Self>,
1294        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1295        cx: &AsyncApp,
1296    ) -> Task<Result<()>> {
1297        cx.spawn(async move |cx| {
1298            if let Some(this) = this.upgrade() {
1299                let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1300                this.outgoing_tx.lock().unbounded_send(envelope).ok();
1301            };
1302
1303            let peer_id = PeerId { owner_id: 0, id: 0 };
1304            while let Some(incoming) = incoming_rx.next().await {
1305                let Some(this) = this.upgrade() else {
1306                    return anyhow::Ok(());
1307                };
1308                if let Some(ack_id) = incoming.ack_id {
1309                    let mut buffer = this.buffer.lock();
1310                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1311                        buffer.pop_front();
1312                    }
1313                }
1314                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1315                {
1316                    log::debug!(
1317                        "{}:ssh message received. name:FlushBufferedMessages",
1318                        this.name
1319                    );
1320                    {
1321                        let buffer = this.buffer.lock();
1322                        for envelope in buffer.iter() {
1323                            this.outgoing_tx
1324                                .lock()
1325                                .unbounded_send(envelope.clone())
1326                                .ok();
1327                        }
1328                    }
1329                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1330                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1331                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1332                    continue;
1333                }
1334
1335                if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1336                    this.remote_started.set(());
1337                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1338                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1339                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1340                    continue;
1341                }
1342
1343                this.max_received.store(incoming.id, SeqCst);
1344
1345                if let Some(request_id) = incoming.responding_to {
1346                    let request_id = MessageId(request_id);
1347                    let sender = this.response_channels.lock().remove(&request_id);
1348                    if let Some(sender) = sender {
1349                        let (tx, rx) = oneshot::channel();
1350                        if incoming.payload.is_some() {
1351                            sender.send((incoming, tx)).ok();
1352                        }
1353                        rx.await.ok();
1354                    }
1355                } else if let Some(envelope) =
1356                    build_typed_envelope(peer_id, Instant::now(), incoming)
1357                {
1358                    let type_name = envelope.payload_type_name();
1359                    let message_id = envelope.message_id();
1360                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1361                        &this.message_handlers,
1362                        envelope,
1363                        this.clone().into(),
1364                        cx.clone(),
1365                    ) {
1366                        log::debug!("{}:ssh message received. name:{type_name}", this.name);
1367                        cx.foreground_executor()
1368                            .spawn(async move {
1369                                match future.await {
1370                                    Ok(_) => {
1371                                        log::debug!(
1372                                            "{}:ssh message handled. name:{type_name}",
1373                                            this.name
1374                                        );
1375                                    }
1376                                    Err(error) => {
1377                                        log::error!(
1378                                            "{}:error handling message. type:{}, error:{:#}",
1379                                            this.name,
1380                                            type_name,
1381                                            format!("{error:#}").lines().fold(
1382                                                String::new(),
1383                                                |mut message, line| {
1384                                                    if !message.is_empty() {
1385                                                        message.push(' ');
1386                                                    }
1387                                                    message.push_str(line);
1388                                                    message
1389                                                }
1390                                            )
1391                                        );
1392                                    }
1393                                }
1394                            })
1395                            .detach()
1396                    } else {
1397                        log::error!("{}:unhandled ssh message name:{type_name}", this.name);
1398                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1399                            message_id,
1400                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1401                        ) {
1402                            log::error!(
1403                                "{}:error sending error response for {type_name}:{e:#}",
1404                                this.name
1405                            );
1406                        }
1407                    }
1408                }
1409            }
1410            anyhow::Ok(())
1411        })
1412    }
1413
1414    fn reconnect(
1415        self: &Arc<Self>,
1416        incoming_rx: UnboundedReceiver<Envelope>,
1417        outgoing_tx: UnboundedSender<Envelope>,
1418        cx: &AsyncApp,
1419    ) {
1420        *self.outgoing_tx.lock() = outgoing_tx;
1421        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1422    }
1423
1424    fn request<T: RequestMessage>(
1425        &self,
1426        payload: T,
1427    ) -> impl 'static + Future<Output = Result<T::Response>> {
1428        self.request_internal(payload, true)
1429    }
1430
1431    fn request_internal<T: RequestMessage>(
1432        &self,
1433        payload: T,
1434        use_buffer: bool,
1435    ) -> impl 'static + Future<Output = Result<T::Response>> {
1436        log::debug!("ssh request start. name:{}", T::NAME);
1437        let response =
1438            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1439        async move {
1440            let response = response.await?;
1441            log::debug!("ssh request finish. name:{}", T::NAME);
1442            T::Response::from_envelope(response).context("received a response of the wrong type")
1443        }
1444    }
1445
1446    async fn resync(&self, timeout: Duration) -> Result<()> {
1447        smol::future::or(
1448            async {
1449                self.request_internal(proto::FlushBufferedMessages {}, false)
1450                    .await?;
1451
1452                for envelope in self.buffer.lock().iter() {
1453                    self.outgoing_tx
1454                        .lock()
1455                        .unbounded_send(envelope.clone())
1456                        .ok();
1457                }
1458                Ok(())
1459            },
1460            async {
1461                smol::Timer::after(timeout).await;
1462                anyhow::bail!("Timed out resyncing remote client")
1463            },
1464        )
1465        .await
1466    }
1467
1468    async fn ping(&self, timeout: Duration) -> Result<()> {
1469        smol::future::or(
1470            async {
1471                self.request(proto::Ping {}).await?;
1472                Ok(())
1473            },
1474            async {
1475                smol::Timer::after(timeout).await;
1476                anyhow::bail!("Timed out pinging remote client")
1477            },
1478        )
1479        .await
1480    }
1481
1482    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1483        log::debug!("ssh send name:{}", T::NAME);
1484        self.send_dynamic(payload.into_envelope(0, None, None))
1485    }
1486
1487    fn request_dynamic(
1488        &self,
1489        mut envelope: proto::Envelope,
1490        type_name: &'static str,
1491        use_buffer: bool,
1492    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1493        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1494        let (tx, rx) = oneshot::channel();
1495        let mut response_channels_lock = self.response_channels.lock();
1496        response_channels_lock.insert(MessageId(envelope.id), tx);
1497        drop(response_channels_lock);
1498
1499        let result = if use_buffer {
1500            self.send_buffered(envelope)
1501        } else {
1502            self.send_unbuffered(envelope)
1503        };
1504        async move {
1505            if let Err(error) = &result {
1506                log::error!("failed to send message: {error}");
1507                anyhow::bail!("failed to send message: {error}");
1508            }
1509
1510            let response = rx.await.context("connection lost")?.0;
1511            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1512                return Err(RpcError::from_proto(error, type_name));
1513            }
1514            Ok(response)
1515        }
1516    }
1517
1518    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1519        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1520        self.send_buffered(envelope)
1521    }
1522
1523    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1524        envelope.ack_id = Some(self.max_received.load(SeqCst));
1525        self.buffer.lock().push_back(envelope.clone());
1526        // ignore errors on send (happen while we're reconnecting)
1527        // assume that the global "disconnected" overlay is sufficient.
1528        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1529        Ok(())
1530    }
1531
1532    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1533        envelope.ack_id = Some(self.max_received.load(SeqCst));
1534        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1535        Ok(())
1536    }
1537}
1538
1539impl ProtoClient for ChannelClient {
1540    fn request(
1541        &self,
1542        envelope: proto::Envelope,
1543        request_type: &'static str,
1544    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1545        self.request_dynamic(envelope, request_type, true).boxed()
1546    }
1547
1548    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1549        self.send_dynamic(envelope)
1550    }
1551
1552    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1553        self.send_dynamic(envelope)
1554    }
1555
1556    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1557        &self.message_handlers
1558    }
1559
1560    fn is_via_collab(&self) -> bool {
1561        false
1562    }
1563
1564    fn has_wsl_interop(&self) -> bool {
1565        self.has_wsl_interop
1566    }
1567}
1568
1569#[cfg(any(test, feature = "test-support"))]
1570mod fake {
1571    use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1572    use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1573    use anyhow::Result;
1574    use askpass::EncryptedPassword;
1575    use async_trait::async_trait;
1576    use collections::HashMap;
1577    use futures::{
1578        FutureExt, SinkExt, StreamExt,
1579        channel::{
1580            mpsc::{self, Sender},
1581            oneshot,
1582        },
1583        select_biased,
1584    };
1585    use gpui::{App, AppContext as _, AsyncApp, Task, TestAppContext};
1586    use release_channel::ReleaseChannel;
1587    use rpc::proto::Envelope;
1588    use semver::Version;
1589    use std::{path::PathBuf, sync::Arc};
1590    use util::paths::{PathStyle, RemotePathBuf};
1591
1592    pub(super) struct FakeRemoteConnection {
1593        pub(super) connection_options: RemoteConnectionOptions,
1594        pub(super) server_channel: Arc<ChannelClient>,
1595        pub(super) server_cx: SendableCx,
1596    }
1597
1598    pub(super) struct SendableCx(AsyncApp);
1599    impl SendableCx {
1600        // SAFETY: When run in test mode, GPUI is always single threaded.
1601        pub(super) fn new(cx: &TestAppContext) -> Self {
1602            Self(cx.to_async())
1603        }
1604
1605        // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1606        fn get(&self, _: &AsyncApp) -> AsyncApp {
1607            self.0.clone()
1608        }
1609    }
1610
1611    // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1612    unsafe impl Send for SendableCx {}
1613    unsafe impl Sync for SendableCx {}
1614
1615    #[async_trait(?Send)]
1616    impl RemoteConnection for FakeRemoteConnection {
1617        async fn kill(&self) -> Result<()> {
1618            Ok(())
1619        }
1620
1621        fn has_been_killed(&self) -> bool {
1622            false
1623        }
1624
1625        fn build_command(
1626            &self,
1627            program: Option<String>,
1628            args: &[String],
1629            env: &HashMap<String, String>,
1630            _: Option<String>,
1631            _: Option<(u16, String, u16)>,
1632        ) -> Result<CommandTemplate> {
1633            let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1634            let mut ssh_args = Vec::new();
1635            ssh_args.push(ssh_program);
1636            ssh_args.extend(args.iter().cloned());
1637            Ok(CommandTemplate {
1638                program: "ssh".into(),
1639                args: ssh_args,
1640                env: env.clone(),
1641            })
1642        }
1643
1644        fn build_forward_ports_command(
1645            &self,
1646            forwards: Vec<(u16, String, u16)>,
1647        ) -> anyhow::Result<CommandTemplate> {
1648            Ok(CommandTemplate {
1649                program: "ssh".into(),
1650                args: std::iter::once("-N".to_owned())
1651                    .chain(forwards.into_iter().map(|(local_port, host, remote_port)| {
1652                        format!("{local_port}:{host}:{remote_port}")
1653                    }))
1654                    .collect(),
1655                env: Default::default(),
1656            })
1657        }
1658
1659        fn upload_directory(
1660            &self,
1661            _src_path: PathBuf,
1662            _dest_path: RemotePathBuf,
1663            _cx: &App,
1664        ) -> Task<Result<()>> {
1665            unreachable!()
1666        }
1667
1668        fn connection_options(&self) -> RemoteConnectionOptions {
1669            self.connection_options.clone()
1670        }
1671
1672        fn simulate_disconnect(&self, cx: &AsyncApp) {
1673            let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1674            let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1675            self.server_channel
1676                .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1677        }
1678
1679        fn start_proxy(
1680            &self,
1681            _unique_identifier: String,
1682            _reconnect: bool,
1683            mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1684            mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1685            mut connection_activity_tx: Sender<()>,
1686            _delegate: Arc<dyn RemoteClientDelegate>,
1687            cx: &mut AsyncApp,
1688        ) -> Task<Result<i32>> {
1689            let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1690            let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1691
1692            self.server_channel.reconnect(
1693                server_incoming_rx,
1694                server_outgoing_tx,
1695                &self.server_cx.get(cx),
1696            );
1697
1698            cx.background_spawn(async move {
1699                loop {
1700                    select_biased! {
1701                        server_to_client = server_outgoing_rx.next().fuse() => {
1702                            let Some(server_to_client) = server_to_client else {
1703                                return Ok(1)
1704                            };
1705                            connection_activity_tx.try_send(()).ok();
1706                            client_incoming_tx.send(server_to_client).await.ok();
1707                        }
1708                        client_to_server = client_outgoing_rx.next().fuse() => {
1709                            let Some(client_to_server) = client_to_server else {
1710                                return Ok(1)
1711                            };
1712                            server_incoming_tx.send(client_to_server).await.ok();
1713                        }
1714                    }
1715                }
1716            })
1717        }
1718
1719        fn path_style(&self) -> PathStyle {
1720            PathStyle::local()
1721        }
1722
1723        fn shell(&self) -> String {
1724            "sh".to_owned()
1725        }
1726
1727        fn default_system_shell(&self) -> String {
1728            "sh".to_owned()
1729        }
1730
1731        fn has_wsl_interop(&self) -> bool {
1732            false
1733        }
1734    }
1735
1736    pub(super) struct Delegate;
1737
1738    impl RemoteClientDelegate for Delegate {
1739        fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1740            unreachable!()
1741        }
1742
1743        fn download_server_binary_locally(
1744            &self,
1745            _: RemotePlatform,
1746            _: ReleaseChannel,
1747            _: Option<Version>,
1748            _: &mut AsyncApp,
1749        ) -> Task<Result<PathBuf>> {
1750            unreachable!()
1751        }
1752
1753        fn get_download_url(
1754            &self,
1755            _platform: RemotePlatform,
1756            _release_channel: ReleaseChannel,
1757            _version: Option<Version>,
1758            _cx: &mut AsyncApp,
1759        ) -> Task<Result<Option<String>>> {
1760            unreachable!()
1761        }
1762
1763        fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1764    }
1765}