ssh_session.rs

   1use crate::{
   2    json_log::LogRecord,
   3    protocol::{
   4        message_len_from_buffer, read_message_with_len, write_message, MessageId, MESSAGE_LEN_SIZE,
   5    },
   6    proxy::ProxyLaunchError,
   7};
   8use anyhow::{anyhow, Context as _, Result};
   9use async_trait::async_trait;
  10use collections::HashMap;
  11use futures::{
  12    channel::{
  13        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  14        oneshot,
  15    },
  16    future::BoxFuture,
  17    select, select_biased, AsyncReadExt as _, Future, FutureExt as _, StreamExt as _,
  18};
  19use gpui::{
  20    AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SemanticVersion, Task,
  21    WeakModel,
  22};
  23use parking_lot::Mutex;
  24use rpc::{
  25    proto::{self, build_typed_envelope, Envelope, EnvelopedMessage, PeerId, RequestMessage},
  26    AnyProtoClient, EntityMessageSubscriber, ProtoClient, ProtoMessageHandlerSet, RpcError,
  27};
  28use smol::{
  29    fs,
  30    process::{self, Child, Stdio},
  31};
  32use std::{
  33    any::TypeId,
  34    collections::VecDeque,
  35    ffi::OsStr,
  36    fmt,
  37    ops::ControlFlow,
  38    path::{Path, PathBuf},
  39    sync::{
  40        atomic::{AtomicU32, Ordering::SeqCst},
  41        Arc, Weak,
  42    },
  43    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
  44};
  45use tempfile::TempDir;
  46use util::ResultExt;
  47
  48#[derive(
  49    Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
  50)]
  51pub struct SshProjectId(pub u64);
  52
  53#[derive(Clone)]
  54pub struct SshSocket {
  55    connection_options: SshConnectionOptions,
  56    socket_path: PathBuf,
  57}
  58
  59#[derive(Debug, Default, Clone, PartialEq, Eq)]
  60pub struct SshConnectionOptions {
  61    pub host: String,
  62    pub username: Option<String>,
  63    pub port: Option<u16>,
  64    pub password: Option<String>,
  65    pub args: Option<Vec<String>>,
  66}
  67
  68impl SshConnectionOptions {
  69    pub fn parse_command_line(input: &str) -> Result<Self> {
  70        let input = input.trim_start_matches("ssh ");
  71        let mut hostname: Option<String> = None;
  72        let mut username: Option<String> = None;
  73        let mut port: Option<u16> = None;
  74        let mut args = Vec::new();
  75
  76        // disallowed: -E, -e, -F, -f, -G, -g, -M, -N, -n, -O, -q, -S, -s, -T, -t, -V, -v, -W
  77        const ALLOWED_OPTS: &[&str] = &[
  78            "-4", "-6", "-A", "-a", "-C", "-K", "-k", "-X", "-x", "-Y", "-y",
  79        ];
  80        const ALLOWED_ARGS: &[&str] = &[
  81            "-B", "-b", "-c", "-D", "-I", "-i", "-J", "-L", "-l", "-m", "-o", "-P", "-p", "-R",
  82            "-w",
  83        ];
  84
  85        let mut tokens = shlex::split(input)
  86            .ok_or_else(|| anyhow!("invalid input"))?
  87            .into_iter();
  88
  89        'outer: while let Some(arg) = tokens.next() {
  90            if ALLOWED_OPTS.contains(&(&arg as &str)) {
  91                args.push(arg.to_string());
  92                continue;
  93            }
  94            if arg == "-p" {
  95                port = tokens.next().and_then(|arg| arg.parse().ok());
  96                continue;
  97            } else if let Some(p) = arg.strip_prefix("-p") {
  98                port = p.parse().ok();
  99                continue;
 100            }
 101            if arg == "-l" {
 102                username = tokens.next();
 103                continue;
 104            } else if let Some(l) = arg.strip_prefix("-l") {
 105                username = Some(l.to_string());
 106                continue;
 107            }
 108            for a in ALLOWED_ARGS {
 109                if arg == *a {
 110                    args.push(arg);
 111                    if let Some(next) = tokens.next() {
 112                        args.push(next);
 113                    }
 114                    continue 'outer;
 115                } else if arg.starts_with(a) {
 116                    args.push(arg);
 117                    continue 'outer;
 118                }
 119            }
 120            if arg.starts_with("-") || hostname.is_some() {
 121                anyhow::bail!("unsupported argument: {:?}", arg);
 122            }
 123            let mut input = &arg as &str;
 124            if let Some((u, rest)) = input.split_once('@') {
 125                input = rest;
 126                username = Some(u.to_string());
 127            }
 128            if let Some((rest, p)) = input.split_once(':') {
 129                input = rest;
 130                port = p.parse().ok()
 131            }
 132            hostname = Some(input.to_string())
 133        }
 134
 135        let Some(hostname) = hostname else {
 136            anyhow::bail!("missing hostname");
 137        };
 138
 139        Ok(Self {
 140            host: hostname.to_string(),
 141            username: username.clone(),
 142            port,
 143            password: None,
 144            args: Some(args),
 145        })
 146    }
 147
 148    pub fn ssh_url(&self) -> String {
 149        let mut result = String::from("ssh://");
 150        if let Some(username) = &self.username {
 151            result.push_str(username);
 152            result.push('@');
 153        }
 154        result.push_str(&self.host);
 155        if let Some(port) = self.port {
 156            result.push(':');
 157            result.push_str(&port.to_string());
 158        }
 159        result
 160    }
 161
 162    pub fn additional_args(&self) -> Option<&Vec<String>> {
 163        self.args.as_ref()
 164    }
 165
 166    fn scp_url(&self) -> String {
 167        if let Some(username) = &self.username {
 168            format!("{}@{}", username, self.host)
 169        } else {
 170            self.host.clone()
 171        }
 172    }
 173
 174    pub fn connection_string(&self) -> String {
 175        let host = if let Some(username) = &self.username {
 176            format!("{}@{}", username, self.host)
 177        } else {
 178            self.host.clone()
 179        };
 180        if let Some(port) = &self.port {
 181            format!("{}:{}", host, port)
 182        } else {
 183            host
 184        }
 185    }
 186
 187    // Uniquely identifies dev server projects on a remote host. Needs to be
 188    // stable for the same dev server project.
 189    pub fn remote_server_identifier(&self) -> String {
 190        let mut identifier = format!("dev-server-{:?}", self.host);
 191        if let Some(username) = self.username.as_ref() {
 192            identifier.push('-');
 193            identifier.push_str(&username);
 194        }
 195        identifier
 196    }
 197}
 198
 199#[derive(Copy, Clone, Debug)]
 200pub struct SshPlatform {
 201    pub os: &'static str,
 202    pub arch: &'static str,
 203}
 204
 205impl SshPlatform {
 206    pub fn triple(&self) -> Option<String> {
 207        Some(format!(
 208            "{}-{}",
 209            self.arch,
 210            match self.os {
 211                "linux" => "unknown-linux-gnu",
 212                "macos" => "apple-darwin",
 213                _ => return None,
 214            }
 215        ))
 216    }
 217}
 218
 219pub enum ServerBinary {
 220    LocalBinary(PathBuf),
 221    ReleaseUrl { url: String, body: String },
 222}
 223
 224pub trait SshClientDelegate: Send + Sync {
 225    fn ask_password(
 226        &self,
 227        prompt: String,
 228        cx: &mut AsyncAppContext,
 229    ) -> oneshot::Receiver<Result<String>>;
 230    fn remote_server_binary_path(
 231        &self,
 232        platform: SshPlatform,
 233        cx: &mut AsyncAppContext,
 234    ) -> Result<PathBuf>;
 235    fn get_server_binary(
 236        &self,
 237        platform: SshPlatform,
 238        cx: &mut AsyncAppContext,
 239    ) -> oneshot::Receiver<Result<(ServerBinary, SemanticVersion)>>;
 240    fn set_status(&self, status: Option<&str>, cx: &mut AsyncAppContext);
 241}
 242
 243impl SshSocket {
 244    fn ssh_command<S: AsRef<OsStr>>(&self, program: S) -> process::Command {
 245        let mut command = process::Command::new("ssh");
 246        self.ssh_options(&mut command)
 247            .arg(self.connection_options.ssh_url())
 248            .arg(program);
 249        command
 250    }
 251
 252    fn ssh_options<'a>(&self, command: &'a mut process::Command) -> &'a mut process::Command {
 253        command
 254            .stdin(Stdio::piped())
 255            .stdout(Stdio::piped())
 256            .stderr(Stdio::piped())
 257            .args(["-o", "ControlMaster=no", "-o"])
 258            .arg(format!("ControlPath={}", self.socket_path.display()))
 259    }
 260
 261    fn ssh_args(&self) -> Vec<String> {
 262        vec![
 263            "-o".to_string(),
 264            "ControlMaster=no".to_string(),
 265            "-o".to_string(),
 266            format!("ControlPath={}", self.socket_path.display()),
 267            self.connection_options.ssh_url(),
 268        ]
 269    }
 270}
 271
 272async fn run_cmd(command: &mut process::Command) -> Result<String> {
 273    let output = command.output().await?;
 274    if output.status.success() {
 275        Ok(String::from_utf8_lossy(&output.stdout).to_string())
 276    } else {
 277        Err(anyhow!(
 278            "failed to run command: {}",
 279            String::from_utf8_lossy(&output.stderr)
 280        ))
 281    }
 282}
 283
 284const MAX_MISSED_HEARTBEATS: usize = 5;
 285const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
 286const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
 287
 288const MAX_RECONNECT_ATTEMPTS: usize = 3;
 289
 290enum State {
 291    Connecting,
 292    Connected {
 293        ssh_connection: Box<dyn SshRemoteProcess>,
 294        delegate: Arc<dyn SshClientDelegate>,
 295
 296        multiplex_task: Task<Result<()>>,
 297        heartbeat_task: Task<Result<()>>,
 298    },
 299    HeartbeatMissed {
 300        missed_heartbeats: usize,
 301
 302        ssh_connection: Box<dyn SshRemoteProcess>,
 303        delegate: Arc<dyn SshClientDelegate>,
 304
 305        multiplex_task: Task<Result<()>>,
 306        heartbeat_task: Task<Result<()>>,
 307    },
 308    Reconnecting,
 309    ReconnectFailed {
 310        ssh_connection: Box<dyn SshRemoteProcess>,
 311        delegate: Arc<dyn SshClientDelegate>,
 312
 313        error: anyhow::Error,
 314        attempts: usize,
 315    },
 316    ReconnectExhausted,
 317    ServerNotRunning,
 318}
 319
 320impl fmt::Display for State {
 321    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 322        match self {
 323            Self::Connecting => write!(f, "connecting"),
 324            Self::Connected { .. } => write!(f, "connected"),
 325            Self::Reconnecting => write!(f, "reconnecting"),
 326            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 327            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 328            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 329            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 330        }
 331    }
 332}
 333
 334impl State {
 335    fn ssh_connection(&self) -> Option<&dyn SshRemoteProcess> {
 336        match self {
 337            Self::Connected { ssh_connection, .. } => Some(ssh_connection.as_ref()),
 338            Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.as_ref()),
 339            Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.as_ref()),
 340            _ => None,
 341        }
 342    }
 343
 344    fn can_reconnect(&self) -> bool {
 345        match self {
 346            Self::Connected { .. }
 347            | Self::HeartbeatMissed { .. }
 348            | Self::ReconnectFailed { .. } => true,
 349            State::Connecting
 350            | State::Reconnecting
 351            | State::ReconnectExhausted
 352            | State::ServerNotRunning => false,
 353        }
 354    }
 355
 356    fn is_reconnect_failed(&self) -> bool {
 357        matches!(self, Self::ReconnectFailed { .. })
 358    }
 359
 360    fn is_reconnect_exhausted(&self) -> bool {
 361        matches!(self, Self::ReconnectExhausted { .. })
 362    }
 363
 364    fn is_server_not_running(&self) -> bool {
 365        matches!(self, Self::ServerNotRunning)
 366    }
 367
 368    fn is_reconnecting(&self) -> bool {
 369        matches!(self, Self::Reconnecting { .. })
 370    }
 371
 372    fn heartbeat_recovered(self) -> Self {
 373        match self {
 374            Self::HeartbeatMissed {
 375                ssh_connection,
 376                delegate,
 377                multiplex_task,
 378                heartbeat_task,
 379                ..
 380            } => Self::Connected {
 381                ssh_connection,
 382                delegate,
 383                multiplex_task,
 384                heartbeat_task,
 385            },
 386            _ => self,
 387        }
 388    }
 389
 390    fn heartbeat_missed(self) -> Self {
 391        match self {
 392            Self::Connected {
 393                ssh_connection,
 394                delegate,
 395                multiplex_task,
 396                heartbeat_task,
 397            } => Self::HeartbeatMissed {
 398                missed_heartbeats: 1,
 399                ssh_connection,
 400                delegate,
 401                multiplex_task,
 402                heartbeat_task,
 403            },
 404            Self::HeartbeatMissed {
 405                missed_heartbeats,
 406                ssh_connection,
 407                delegate,
 408                multiplex_task,
 409                heartbeat_task,
 410            } => Self::HeartbeatMissed {
 411                missed_heartbeats: missed_heartbeats + 1,
 412                ssh_connection,
 413                delegate,
 414                multiplex_task,
 415                heartbeat_task,
 416            },
 417            _ => self,
 418        }
 419    }
 420}
 421
 422/// The state of the ssh connection.
 423#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 424pub enum ConnectionState {
 425    Connecting,
 426    Connected,
 427    HeartbeatMissed,
 428    Reconnecting,
 429    Disconnected,
 430}
 431
 432impl From<&State> for ConnectionState {
 433    fn from(value: &State) -> Self {
 434        match value {
 435            State::Connecting => Self::Connecting,
 436            State::Connected { .. } => Self::Connected,
 437            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 438            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 439            State::ReconnectExhausted => Self::Disconnected,
 440            State::ServerNotRunning => Self::Disconnected,
 441        }
 442    }
 443}
 444
 445pub struct SshRemoteClient {
 446    client: Arc<ChannelClient>,
 447    unique_identifier: String,
 448    connection_options: SshConnectionOptions,
 449    state: Arc<Mutex<Option<State>>>,
 450}
 451
 452#[derive(Debug)]
 453pub enum SshRemoteEvent {
 454    Disconnected,
 455}
 456
 457impl EventEmitter<SshRemoteEvent> for SshRemoteClient {}
 458
 459impl SshRemoteClient {
 460    pub fn new(
 461        unique_identifier: String,
 462        connection_options: SshConnectionOptions,
 463        cancellation: oneshot::Receiver<()>,
 464        delegate: Arc<dyn SshClientDelegate>,
 465        cx: &AppContext,
 466    ) -> Task<Result<Option<Model<Self>>>> {
 467        cx.spawn(|mut cx| async move {
 468            let success = Box::pin(async move {
 469                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 470                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 471                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 472
 473                let client =
 474                    cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "client"))?;
 475                let this = cx.new_model(|_| Self {
 476                    client: client.clone(),
 477                    unique_identifier: unique_identifier.clone(),
 478                    connection_options: connection_options.clone(),
 479                    state: Arc::new(Mutex::new(Some(State::Connecting))),
 480                })?;
 481
 482                let (ssh_connection, io_task) = Self::establish_connection(
 483                    unique_identifier,
 484                    false,
 485                    connection_options,
 486                    incoming_tx,
 487                    outgoing_rx,
 488                    connection_activity_tx,
 489                    delegate.clone(),
 490                    &mut cx,
 491                )
 492                .await?;
 493
 494                let multiplex_task = Self::monitor(this.downgrade(), io_task, &cx);
 495
 496                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 497                    log::error!("failed to establish connection: {}", error);
 498                    return Err(error);
 499                }
 500
 501                let heartbeat_task =
 502                    Self::heartbeat(this.downgrade(), connection_activity_rx, &mut cx);
 503
 504                this.update(&mut cx, |this, _| {
 505                    *this.state.lock() = Some(State::Connected {
 506                        ssh_connection,
 507                        delegate,
 508                        multiplex_task,
 509                        heartbeat_task,
 510                    });
 511                })?;
 512
 513                Ok(Some(this))
 514            });
 515
 516            select! {
 517                _ = cancellation.fuse() => {
 518                    Ok(None)
 519                }
 520                result = success.fuse() =>  result
 521            }
 522        })
 523    }
 524
 525    pub fn shutdown_processes<T: RequestMessage>(
 526        &self,
 527        shutdown_request: Option<T>,
 528    ) -> Option<impl Future<Output = ()>> {
 529        let state = self.state.lock().take()?;
 530        log::info!("shutting down ssh processes");
 531
 532        let State::Connected {
 533            multiplex_task,
 534            heartbeat_task,
 535            ssh_connection,
 536            delegate,
 537        } = state
 538        else {
 539            return None;
 540        };
 541
 542        let client = self.client.clone();
 543
 544        Some(async move {
 545            if let Some(shutdown_request) = shutdown_request {
 546                client.send(shutdown_request).log_err();
 547                // We wait 50ms instead of waiting for a response, because
 548                // waiting for a response would require us to wait on the main thread
 549                // which we want to avoid in an `on_app_quit` callback.
 550                smol::Timer::after(Duration::from_millis(50)).await;
 551            }
 552
 553            // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
 554            // child of master_process.
 555            drop(multiplex_task);
 556            // Now drop the rest of state, which kills master process.
 557            drop(heartbeat_task);
 558            drop(ssh_connection);
 559            drop(delegate);
 560        })
 561    }
 562
 563    fn reconnect(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
 564        let mut lock = self.state.lock();
 565
 566        let can_reconnect = lock
 567            .as_ref()
 568            .map(|state| state.can_reconnect())
 569            .unwrap_or(false);
 570        if !can_reconnect {
 571            let error = if let Some(state) = lock.as_ref() {
 572                format!("invalid state, cannot reconnect while in state {state}")
 573            } else {
 574                "no state set".to_string()
 575            };
 576            log::info!("aborting reconnect, because not in state that allows reconnecting");
 577            return Err(anyhow!(error));
 578        }
 579
 580        let state = lock.take().unwrap();
 581        let (attempts, mut ssh_connection, delegate) = match state {
 582            State::Connected {
 583                ssh_connection,
 584                delegate,
 585                multiplex_task,
 586                heartbeat_task,
 587            }
 588            | State::HeartbeatMissed {
 589                ssh_connection,
 590                delegate,
 591                multiplex_task,
 592                heartbeat_task,
 593                ..
 594            } => {
 595                drop(multiplex_task);
 596                drop(heartbeat_task);
 597                (0, ssh_connection, delegate)
 598            }
 599            State::ReconnectFailed {
 600                attempts,
 601                ssh_connection,
 602                delegate,
 603                ..
 604            } => (attempts, ssh_connection, delegate),
 605            State::Connecting
 606            | State::Reconnecting
 607            | State::ReconnectExhausted
 608            | State::ServerNotRunning => unreachable!(),
 609        };
 610
 611        let attempts = attempts + 1;
 612        if attempts > MAX_RECONNECT_ATTEMPTS {
 613            log::error!(
 614                "Failed to reconnect to after {} attempts, giving up",
 615                MAX_RECONNECT_ATTEMPTS
 616            );
 617            drop(lock);
 618            self.set_state(State::ReconnectExhausted, cx);
 619            return Ok(());
 620        }
 621        drop(lock);
 622
 623        self.set_state(State::Reconnecting, cx);
 624
 625        log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
 626
 627        let identifier = self.unique_identifier.clone();
 628        let client = self.client.clone();
 629        let reconnect_task = cx.spawn(|this, mut cx| async move {
 630            macro_rules! failed {
 631                ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr) => {
 632                    return State::ReconnectFailed {
 633                        error: anyhow!($error),
 634                        attempts: $attempts,
 635                        ssh_connection: $ssh_connection,
 636                        delegate: $delegate,
 637                    };
 638                };
 639            }
 640
 641            if let Err(error) = ssh_connection
 642                .kill()
 643                .await
 644                .context("Failed to kill ssh process")
 645            {
 646                failed!(error, attempts, ssh_connection, delegate);
 647            };
 648
 649            let connection_options = ssh_connection.connection_options();
 650
 651            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 652            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 653            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 654
 655            let (ssh_connection, io_task) = match Self::establish_connection(
 656                identifier,
 657                true,
 658                connection_options,
 659                incoming_tx,
 660                outgoing_rx,
 661                connection_activity_tx,
 662                delegate.clone(),
 663                &mut cx,
 664            )
 665            .await
 666            {
 667                Ok((ssh_connection, ssh_process)) => (ssh_connection, ssh_process),
 668                Err(error) => {
 669                    failed!(error, attempts, ssh_connection, delegate);
 670                }
 671            };
 672
 673            let multiplex_task = Self::monitor(this.clone(), io_task, &cx);
 674            client.reconnect(incoming_rx, outgoing_tx, &cx);
 675
 676            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 677                failed!(error, attempts, ssh_connection, delegate);
 678            };
 679
 680            State::Connected {
 681                ssh_connection,
 682                delegate,
 683                multiplex_task,
 684                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, &mut cx),
 685            }
 686        });
 687
 688        cx.spawn(|this, mut cx| async move {
 689            let new_state = reconnect_task.await;
 690            this.update(&mut cx, |this, cx| {
 691                this.try_set_state(cx, |old_state| {
 692                    if old_state.is_reconnecting() {
 693                        match &new_state {
 694                            State::Connecting
 695                            | State::Reconnecting { .. }
 696                            | State::HeartbeatMissed { .. }
 697                            | State::ServerNotRunning => {}
 698                            State::Connected { .. } => {
 699                                log::info!("Successfully reconnected");
 700                            }
 701                            State::ReconnectFailed {
 702                                error, attempts, ..
 703                            } => {
 704                                log::error!(
 705                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 706                                    attempts,
 707                                    error
 708                                );
 709                            }
 710                            State::ReconnectExhausted => {
 711                                log::error!("Reconnect attempt failed and all attempts exhausted");
 712                            }
 713                        }
 714                        Some(new_state)
 715                    } else {
 716                        None
 717                    }
 718                });
 719
 720                if this.state_is(State::is_reconnect_failed) {
 721                    this.reconnect(cx)
 722                } else if this.state_is(State::is_reconnect_exhausted) {
 723                    Ok(())
 724                } else {
 725                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 726                    Ok(())
 727                }
 728            })
 729        })
 730        .detach_and_log_err(cx);
 731
 732        Ok(())
 733    }
 734
 735    fn heartbeat(
 736        this: WeakModel<Self>,
 737        mut connection_activity_rx: mpsc::Receiver<()>,
 738        cx: &mut AsyncAppContext,
 739    ) -> Task<Result<()>> {
 740        let Ok(client) = this.update(cx, |this, _| this.client.clone()) else {
 741            return Task::ready(Err(anyhow!("SshRemoteClient lost")));
 742        };
 743
 744        cx.spawn(|mut cx| {
 745            let this = this.clone();
 746            async move {
 747                let mut missed_heartbeats = 0;
 748
 749                let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 750                futures::pin_mut!(keepalive_timer);
 751
 752                loop {
 753                    select_biased! {
 754                        result = connection_activity_rx.next().fuse() => {
 755                            if result.is_none() {
 756                                log::warn!("ssh heartbeat: connection activity channel has been dropped. stopping.");
 757                                return Ok(());
 758                            }
 759
 760                            if missed_heartbeats != 0 {
 761                                missed_heartbeats = 0;
 762                                this.update(&mut cx, |this, mut cx| {
 763                                    this.handle_heartbeat_result(missed_heartbeats, &mut cx)
 764                                })?;
 765                            }
 766                        }
 767                        _ = keepalive_timer => {
 768                            log::debug!("Sending heartbeat to server...");
 769
 770                            let result = select_biased! {
 771                                _ = connection_activity_rx.next().fuse() => {
 772                                    Ok(())
 773                                }
 774                                ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 775                                    ping_result
 776                                }
 777                            };
 778
 779                            if result.is_err() {
 780                                missed_heartbeats += 1;
 781                                log::warn!(
 782                                    "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 783                                    HEARTBEAT_TIMEOUT,
 784                                    missed_heartbeats,
 785                                    MAX_MISSED_HEARTBEATS
 786                                );
 787                            } else if missed_heartbeats != 0 {
 788                                missed_heartbeats = 0;
 789                            } else {
 790                                continue;
 791                            }
 792
 793                            let result = this.update(&mut cx, |this, mut cx| {
 794                                this.handle_heartbeat_result(missed_heartbeats, &mut cx)
 795                            })?;
 796                            if result.is_break() {
 797                                return Ok(());
 798                            }
 799                        }
 800                    }
 801
 802                    keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 803                }
 804            }
 805        })
 806    }
 807
 808    fn handle_heartbeat_result(
 809        &mut self,
 810        missed_heartbeats: usize,
 811        cx: &mut ModelContext<Self>,
 812    ) -> ControlFlow<()> {
 813        let state = self.state.lock().take().unwrap();
 814        let next_state = if missed_heartbeats > 0 {
 815            state.heartbeat_missed()
 816        } else {
 817            state.heartbeat_recovered()
 818        };
 819
 820        self.set_state(next_state, cx);
 821
 822        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 823            log::error!(
 824                "Missed last {} heartbeats. Reconnecting...",
 825                missed_heartbeats
 826            );
 827
 828            self.reconnect(cx)
 829                .context("failed to start reconnect process after missing heartbeats")
 830                .log_err();
 831            ControlFlow::Break(())
 832        } else {
 833            ControlFlow::Continue(())
 834        }
 835    }
 836
 837    fn multiplex(
 838        mut ssh_proxy_process: Child,
 839        incoming_tx: UnboundedSender<Envelope>,
 840        mut outgoing_rx: UnboundedReceiver<Envelope>,
 841        mut connection_activity_tx: Sender<()>,
 842        cx: &AsyncAppContext,
 843    ) -> Task<Result<i32>> {
 844        let mut child_stderr = ssh_proxy_process.stderr.take().unwrap();
 845        let mut child_stdout = ssh_proxy_process.stdout.take().unwrap();
 846        let mut child_stdin = ssh_proxy_process.stdin.take().unwrap();
 847
 848        let mut stdin_buffer = Vec::new();
 849        let mut stdout_buffer = Vec::new();
 850        let mut stderr_buffer = Vec::new();
 851        let mut stderr_offset = 0;
 852
 853        let stdin_task = cx.background_executor().spawn(async move {
 854            while let Some(outgoing) = outgoing_rx.next().await {
 855                write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
 856            }
 857            anyhow::Ok(())
 858        });
 859
 860        let stdout_task = cx.background_executor().spawn({
 861            let mut connection_activity_tx = connection_activity_tx.clone();
 862            async move {
 863                loop {
 864                    stdout_buffer.resize(MESSAGE_LEN_SIZE, 0);
 865                    let len = child_stdout.read(&mut stdout_buffer).await?;
 866
 867                    if len == 0 {
 868                        return anyhow::Ok(());
 869                    }
 870
 871                    if len < MESSAGE_LEN_SIZE {
 872                        child_stdout.read_exact(&mut stdout_buffer[len..]).await?;
 873                    }
 874
 875                    let message_len = message_len_from_buffer(&stdout_buffer);
 876                    let envelope =
 877                        read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len)
 878                            .await?;
 879                    connection_activity_tx.try_send(()).ok();
 880                    incoming_tx.unbounded_send(envelope).ok();
 881                }
 882            }
 883        });
 884
 885        let stderr_task: Task<anyhow::Result<()>> = cx.background_executor().spawn(async move {
 886            loop {
 887                stderr_buffer.resize(stderr_offset + 1024, 0);
 888
 889                let len = child_stderr
 890                    .read(&mut stderr_buffer[stderr_offset..])
 891                    .await?;
 892                if len == 0 {
 893                    return anyhow::Ok(());
 894                }
 895
 896                stderr_offset += len;
 897                let mut start_ix = 0;
 898                while let Some(ix) = stderr_buffer[start_ix..stderr_offset]
 899                    .iter()
 900                    .position(|b| b == &b'\n')
 901                {
 902                    let line_ix = start_ix + ix;
 903                    let content = &stderr_buffer[start_ix..line_ix];
 904                    start_ix = line_ix + 1;
 905                    if let Ok(record) = serde_json::from_slice::<LogRecord>(content) {
 906                        record.log(log::logger())
 907                    } else {
 908                        eprintln!("(remote) {}", String::from_utf8_lossy(content));
 909                    }
 910                }
 911                stderr_buffer.drain(0..start_ix);
 912                stderr_offset -= start_ix;
 913
 914                connection_activity_tx.try_send(()).ok();
 915            }
 916        });
 917
 918        cx.spawn(|_| async move {
 919            let result = futures::select! {
 920                result = stdin_task.fuse() => {
 921                    result.context("stdin")
 922                }
 923                result = stdout_task.fuse() => {
 924                    result.context("stdout")
 925                }
 926                result = stderr_task.fuse() => {
 927                    result.context("stderr")
 928                }
 929            };
 930
 931            let status = ssh_proxy_process.status().await?.code().unwrap_or(1);
 932            match result {
 933                Ok(_) => Ok(status),
 934                Err(error) => Err(error),
 935            }
 936        })
 937    }
 938
 939    fn monitor(
 940        this: WeakModel<Self>,
 941        io_task: Task<Result<i32>>,
 942        cx: &AsyncAppContext,
 943    ) -> Task<Result<()>> {
 944        cx.spawn(|mut cx| async move {
 945            let result = io_task.await;
 946
 947            match result {
 948                Ok(exit_code) => {
 949                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 950                        match error {
 951                            ProxyLaunchError::ServerNotRunning => {
 952                                log::error!("failed to reconnect because server is not running");
 953                                this.update(&mut cx, |this, cx| {
 954                                    this.set_state(State::ServerNotRunning, cx);
 955                                })?;
 956                            }
 957                        }
 958                    } else if exit_code > 0 {
 959                        log::error!("proxy process terminated unexpectedly");
 960                        this.update(&mut cx, |this, cx| {
 961                            this.reconnect(cx).ok();
 962                        })?;
 963                    }
 964                }
 965                Err(error) => {
 966                    log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
 967                    this.update(&mut cx, |this, cx| {
 968                        this.reconnect(cx).ok();
 969                    })?;
 970                }
 971            }
 972
 973            Ok(())
 974        })
 975    }
 976
 977    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 978        self.state.lock().as_ref().map_or(false, check)
 979    }
 980
 981    fn try_set_state(
 982        &self,
 983        cx: &mut ModelContext<Self>,
 984        map: impl FnOnce(&State) -> Option<State>,
 985    ) {
 986        let mut lock = self.state.lock();
 987        let new_state = lock.as_ref().and_then(map);
 988
 989        if let Some(new_state) = new_state {
 990            lock.replace(new_state);
 991            cx.notify();
 992        }
 993    }
 994
 995    fn set_state(&self, state: State, cx: &mut ModelContext<Self>) {
 996        log::info!("setting state to '{}'", &state);
 997
 998        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 999        let is_server_not_running = state.is_server_not_running();
1000        self.state.lock().replace(state);
1001
1002        if is_reconnect_exhausted || is_server_not_running {
1003            cx.emit(SshRemoteEvent::Disconnected);
1004        }
1005        cx.notify();
1006    }
1007
1008    #[allow(clippy::too_many_arguments)]
1009    async fn establish_connection(
1010        unique_identifier: String,
1011        reconnect: bool,
1012        connection_options: SshConnectionOptions,
1013        incoming_tx: UnboundedSender<Envelope>,
1014        outgoing_rx: UnboundedReceiver<Envelope>,
1015        connection_activity_tx: Sender<()>,
1016        delegate: Arc<dyn SshClientDelegate>,
1017        cx: &mut AsyncAppContext,
1018    ) -> Result<(Box<dyn SshRemoteProcess>, Task<Result<i32>>)> {
1019        #[cfg(any(test, feature = "test-support"))]
1020        if let Some(fake) = fake::SshRemoteConnection::new(&connection_options) {
1021            let io_task = fake::SshRemoteConnection::multiplex(
1022                fake.connection_options(),
1023                incoming_tx,
1024                outgoing_rx,
1025                connection_activity_tx,
1026                cx,
1027            )
1028            .await;
1029            return Ok((fake, io_task));
1030        }
1031
1032        let ssh_connection =
1033            SshRemoteConnection::new(connection_options, delegate.clone(), cx).await?;
1034
1035        let platform = ssh_connection.query_platform().await?;
1036        let remote_binary_path = delegate.remote_server_binary_path(platform, cx)?;
1037        if !reconnect {
1038            ssh_connection
1039                .ensure_server_binary(&delegate, &remote_binary_path, platform, cx)
1040                .await?;
1041        }
1042
1043        let socket = ssh_connection.socket.clone();
1044        run_cmd(socket.ssh_command(&remote_binary_path).arg("version")).await?;
1045
1046        delegate.set_status(Some("Starting proxy"), cx);
1047
1048        let mut start_proxy_command = format!(
1049            "RUST_LOG={} RUST_BACKTRACE={} {:?} proxy --identifier {}",
1050            std::env::var("RUST_LOG").unwrap_or_default(),
1051            std::env::var("RUST_BACKTRACE").unwrap_or_default(),
1052            remote_binary_path,
1053            unique_identifier,
1054        );
1055        if reconnect {
1056            start_proxy_command.push_str(" --reconnect");
1057        }
1058
1059        let ssh_proxy_process = socket
1060            .ssh_command(start_proxy_command)
1061            // IMPORTANT: we kill this process when we drop the task that uses it.
1062            .kill_on_drop(true)
1063            .spawn()
1064            .context("failed to spawn remote server")?;
1065
1066        let io_task = Self::multiplex(
1067            ssh_proxy_process,
1068            incoming_tx,
1069            outgoing_rx,
1070            connection_activity_tx,
1071            &cx,
1072        );
1073
1074        Ok((Box::new(ssh_connection), io_task))
1075    }
1076
1077    pub fn subscribe_to_entity<E: 'static>(&self, remote_id: u64, entity: &Model<E>) {
1078        self.client.subscribe_to_entity(remote_id, entity);
1079    }
1080
1081    pub fn ssh_args(&self) -> Option<Vec<String>> {
1082        self.state
1083            .lock()
1084            .as_ref()
1085            .and_then(|state| state.ssh_connection())
1086            .map(|ssh_connection| ssh_connection.ssh_args())
1087    }
1088
1089    pub fn proto_client(&self) -> AnyProtoClient {
1090        self.client.clone().into()
1091    }
1092
1093    pub fn connection_string(&self) -> String {
1094        self.connection_options.connection_string()
1095    }
1096
1097    pub fn connection_options(&self) -> SshConnectionOptions {
1098        self.connection_options.clone()
1099    }
1100
1101    pub fn connection_state(&self) -> ConnectionState {
1102        self.state
1103            .lock()
1104            .as_ref()
1105            .map(ConnectionState::from)
1106            .unwrap_or(ConnectionState::Disconnected)
1107    }
1108
1109    pub fn is_disconnected(&self) -> bool {
1110        self.connection_state() == ConnectionState::Disconnected
1111    }
1112
1113    #[cfg(any(test, feature = "test-support"))]
1114    pub fn simulate_disconnect(&self, client_cx: &mut AppContext) -> Task<()> {
1115        let port = self.connection_options().port.unwrap();
1116        client_cx.spawn(|cx| async move {
1117            let (channel, server_cx) = cx
1118                .update_global(|c: &mut fake::ServerConnections, _| c.get(port))
1119                .unwrap();
1120
1121            let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1122            let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1123            channel.reconnect(incoming_rx, outgoing_tx, &server_cx);
1124        })
1125    }
1126
1127    #[cfg(any(test, feature = "test-support"))]
1128    pub fn fake_server(
1129        client_cx: &mut gpui::TestAppContext,
1130        server_cx: &mut gpui::TestAppContext,
1131    ) -> (u16, Arc<ChannelClient>) {
1132        use gpui::BorrowAppContext;
1133        let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1134        let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1135        let server_client =
1136            server_cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server"));
1137        let port = client_cx.update(|cx| {
1138            cx.update_default_global(|c: &mut fake::ServerConnections, _| {
1139                c.push(server_client.clone(), server_cx.to_async())
1140            })
1141        });
1142        (port, server_client)
1143    }
1144
1145    #[cfg(any(test, feature = "test-support"))]
1146    pub async fn fake_client(port: u16, client_cx: &mut gpui::TestAppContext) -> Model<Self> {
1147        let (_tx, rx) = oneshot::channel();
1148        client_cx
1149            .update(|cx| {
1150                Self::new(
1151                    "fake".to_string(),
1152                    SshConnectionOptions {
1153                        host: "<fake>".to_string(),
1154                        port: Some(port),
1155                        ..Default::default()
1156                    },
1157                    rx,
1158                    Arc::new(fake::Delegate),
1159                    cx,
1160                )
1161            })
1162            .await
1163            .unwrap()
1164            .unwrap()
1165    }
1166}
1167
1168impl From<SshRemoteClient> for AnyProtoClient {
1169    fn from(client: SshRemoteClient) -> Self {
1170        AnyProtoClient::new(client.client.clone())
1171    }
1172}
1173
1174#[async_trait]
1175trait SshRemoteProcess: Send + Sync {
1176    async fn kill(&mut self) -> Result<()>;
1177    fn ssh_args(&self) -> Vec<String>;
1178    fn connection_options(&self) -> SshConnectionOptions;
1179}
1180
1181struct SshRemoteConnection {
1182    socket: SshSocket,
1183    master_process: process::Child,
1184    _temp_dir: TempDir,
1185}
1186
1187impl Drop for SshRemoteConnection {
1188    fn drop(&mut self) {
1189        if let Err(error) = self.master_process.kill() {
1190            log::error!("failed to kill SSH master process: {}", error);
1191        }
1192    }
1193}
1194
1195#[async_trait]
1196impl SshRemoteProcess for SshRemoteConnection {
1197    async fn kill(&mut self) -> Result<()> {
1198        self.master_process.kill()?;
1199
1200        self.master_process.status().await?;
1201
1202        Ok(())
1203    }
1204
1205    fn ssh_args(&self) -> Vec<String> {
1206        self.socket.ssh_args()
1207    }
1208
1209    fn connection_options(&self) -> SshConnectionOptions {
1210        self.socket.connection_options.clone()
1211    }
1212}
1213
1214impl SshRemoteConnection {
1215    #[cfg(not(unix))]
1216    async fn new(
1217        _connection_options: SshConnectionOptions,
1218        _delegate: Arc<dyn SshClientDelegate>,
1219        _cx: &mut AsyncAppContext,
1220    ) -> Result<Self> {
1221        Err(anyhow!("ssh is not supported on this platform"))
1222    }
1223
1224    #[cfg(unix)]
1225    async fn new(
1226        connection_options: SshConnectionOptions,
1227        delegate: Arc<dyn SshClientDelegate>,
1228        cx: &mut AsyncAppContext,
1229    ) -> Result<Self> {
1230        use futures::AsyncWriteExt as _;
1231        use futures::{io::BufReader, AsyncBufReadExt as _};
1232        use smol::{fs::unix::PermissionsExt as _, net::unix::UnixListener};
1233        use util::ResultExt as _;
1234
1235        delegate.set_status(Some("Connecting"), cx);
1236
1237        let url = connection_options.ssh_url();
1238        let temp_dir = tempfile::Builder::new()
1239            .prefix("zed-ssh-session")
1240            .tempdir()?;
1241
1242        // Create a domain socket listener to handle requests from the askpass program.
1243        let askpass_socket = temp_dir.path().join("askpass.sock");
1244        let (askpass_opened_tx, askpass_opened_rx) = oneshot::channel::<()>();
1245        let listener =
1246            UnixListener::bind(&askpass_socket).context("failed to create askpass socket")?;
1247
1248        let askpass_task = cx.spawn({
1249            let delegate = delegate.clone();
1250            |mut cx| async move {
1251                let mut askpass_opened_tx = Some(askpass_opened_tx);
1252
1253                while let Ok((mut stream, _)) = listener.accept().await {
1254                    if let Some(askpass_opened_tx) = askpass_opened_tx.take() {
1255                        askpass_opened_tx.send(()).ok();
1256                    }
1257                    let mut buffer = Vec::new();
1258                    let mut reader = BufReader::new(&mut stream);
1259                    if reader.read_until(b'\0', &mut buffer).await.is_err() {
1260                        buffer.clear();
1261                    }
1262                    let password_prompt = String::from_utf8_lossy(&buffer);
1263                    if let Some(password) = delegate
1264                        .ask_password(password_prompt.to_string(), &mut cx)
1265                        .await
1266                        .context("failed to get ssh password")
1267                        .and_then(|p| p)
1268                        .log_err()
1269                    {
1270                        stream.write_all(password.as_bytes()).await.log_err();
1271                    }
1272                }
1273            }
1274        });
1275
1276        // Create an askpass script that communicates back to this process.
1277        let askpass_script = format!(
1278            "{shebang}\n{print_args} | nc -U {askpass_socket} 2> /dev/null \n",
1279            askpass_socket = askpass_socket.display(),
1280            print_args = "printf '%s\\0' \"$@\"",
1281            shebang = "#!/bin/sh",
1282        );
1283        let askpass_script_path = temp_dir.path().join("askpass.sh");
1284        fs::write(&askpass_script_path, askpass_script).await?;
1285        fs::set_permissions(&askpass_script_path, std::fs::Permissions::from_mode(0o755)).await?;
1286
1287        // Start the master SSH process, which does not do anything except for establish
1288        // the connection and keep it open, allowing other ssh commands to reuse it
1289        // via a control socket.
1290        let socket_path = temp_dir.path().join("ssh.sock");
1291        let mut master_process = process::Command::new("ssh")
1292            .stdin(Stdio::null())
1293            .stdout(Stdio::piped())
1294            .stderr(Stdio::piped())
1295            .env("SSH_ASKPASS_REQUIRE", "force")
1296            .env("SSH_ASKPASS", &askpass_script_path)
1297            .args(connection_options.additional_args().unwrap_or(&Vec::new()))
1298            .args([
1299                "-N",
1300                "-o",
1301                "ControlPersist=no",
1302                "-o",
1303                "ControlMaster=yes",
1304                "-o",
1305            ])
1306            .arg(format!("ControlPath={}", socket_path.display()))
1307            .arg(&url)
1308            .spawn()?;
1309
1310        // Wait for this ssh process to close its stdout, indicating that authentication
1311        // has completed.
1312        let stdout = master_process.stdout.as_mut().unwrap();
1313        let mut output = Vec::new();
1314        let connection_timeout = Duration::from_secs(10);
1315
1316        let result = select_biased! {
1317            _ = askpass_opened_rx.fuse() => {
1318                // If the askpass script has opened, that means the user is typing
1319                // their password, in which case we don't want to timeout anymore,
1320                // since we know a connection has been established.
1321                stdout.read_to_end(&mut output).await?;
1322                Ok(())
1323            }
1324            result = stdout.read_to_end(&mut output).fuse() => {
1325                result?;
1326                Ok(())
1327            }
1328            _ = futures::FutureExt::fuse(smol::Timer::after(connection_timeout)) => {
1329                Err(anyhow!("Exceeded {:?} timeout trying to connect to host", connection_timeout))
1330            }
1331        };
1332
1333        if let Err(e) = result {
1334            return Err(e.context("Failed to connect to host"));
1335        }
1336
1337        drop(askpass_task);
1338
1339        if master_process.try_status()?.is_some() {
1340            output.clear();
1341            let mut stderr = master_process.stderr.take().unwrap();
1342            stderr.read_to_end(&mut output).await?;
1343
1344            let error_message = format!(
1345                "failed to connect: {}",
1346                String::from_utf8_lossy(&output).trim()
1347            );
1348            Err(anyhow!(error_message))?;
1349        }
1350
1351        Ok(Self {
1352            socket: SshSocket {
1353                connection_options,
1354                socket_path,
1355            },
1356            master_process,
1357            _temp_dir: temp_dir,
1358        })
1359    }
1360
1361    async fn ensure_server_binary(
1362        &self,
1363        delegate: &Arc<dyn SshClientDelegate>,
1364        dst_path: &Path,
1365        platform: SshPlatform,
1366        cx: &mut AsyncAppContext,
1367    ) -> Result<()> {
1368        let lock_file = dst_path.with_extension("lock");
1369        let timestamp = SystemTime::now()
1370            .duration_since(UNIX_EPOCH)
1371            .unwrap()
1372            .as_secs();
1373        let lock_content = timestamp.to_string();
1374
1375        let lock_stale_age = Duration::from_secs(10 * 60);
1376        let max_wait_time = Duration::from_secs(10 * 60);
1377        let check_interval = Duration::from_secs(5);
1378        let start_time = Instant::now();
1379
1380        loop {
1381            let lock_acquired = self.create_lock_file(&lock_file, &lock_content).await?;
1382            if lock_acquired {
1383                let result = self
1384                    .update_server_binary_if_needed(delegate, dst_path, platform, cx)
1385                    .await;
1386
1387                self.remove_lock_file(&lock_file).await.ok();
1388
1389                return result;
1390            } else {
1391                if let Ok(is_stale) = self.is_lock_stale(&lock_file, &lock_stale_age).await {
1392                    if is_stale {
1393                        self.remove_lock_file(&lock_file).await?;
1394                        continue;
1395                    } else {
1396                        if start_time.elapsed() > max_wait_time {
1397                            return Err(anyhow!("Timeout waiting for lock to be released"));
1398                        }
1399                        log::info!(
1400                            "Found lockfile: {:?}. Will check again in {:?}",
1401                            lock_file,
1402                            check_interval
1403                        );
1404                        delegate.set_status(
1405                            Some("Waiting for another Zed instance to finish uploading binary"),
1406                            cx,
1407                        );
1408                        smol::Timer::after(check_interval).await;
1409                        continue;
1410                    }
1411                } else {
1412                    // Unable to check lock, assume it's valid and wait
1413                    if start_time.elapsed() > max_wait_time {
1414                        return Err(anyhow!("Timeout waiting for lock to be released"));
1415                    }
1416                    smol::Timer::after(check_interval).await;
1417                    continue;
1418                }
1419            }
1420        }
1421    }
1422
1423    async fn create_lock_file(&self, lock_file: &Path, content: &str) -> Result<bool> {
1424        let parent_dir = lock_file
1425            .parent()
1426            .ok_or_else(|| anyhow!("Lock file path has no parent directory"))?;
1427
1428        // Be mindful of the escaping here: we need to make sure that we have quotes
1429        // inside the string, so that `sh -c` gets a quoted string passed to it.
1430        let script = format!(
1431            "\"mkdir -p '{0}' &&  [ ! -f '{1}' ] && echo '{2}' > '{1}' && echo 'created' || echo 'exists'\"",
1432            parent_dir.display(),
1433            lock_file.display(),
1434            content
1435        );
1436
1437        let output = run_cmd(self.socket.ssh_command("sh").arg("-c").arg(&script))
1438            .await
1439            .with_context(|| format!("failed to create a lock file at {:?}", lock_file))?;
1440
1441        Ok(output.trim() == "created")
1442    }
1443
1444    async fn is_lock_stale(&self, lock_file: &Path, max_age: &Duration) -> Result<bool> {
1445        let threshold = max_age.as_secs();
1446
1447        // Be mindful of the escaping here: we need to make sure that we have quotes
1448        // inside the string, so that `sh -c` gets a quoted string passed to it.
1449        let script = format!(
1450            "\"[ -f '{0}' ] && [ $(( $(date +%s) - $(date -r '{0}' +%s) )) -gt {1} ] && echo 'stale' ||  echo 'recent'\"",
1451            lock_file.display(),
1452            threshold
1453        );
1454
1455        let output = run_cmd(self.socket.ssh_command("sh").arg("-c").arg(script))
1456            .await
1457            .with_context(|| {
1458                format!("failed to check whether lock file {:?} is stale", lock_file)
1459            })?;
1460
1461        Ok(output.trim() == "stale")
1462    }
1463
1464    async fn remove_lock_file(&self, lock_file: &Path) -> Result<()> {
1465        run_cmd(self.socket.ssh_command("rm").arg("-f").arg(lock_file))
1466            .await
1467            .context("failed to remove lock file")?;
1468        Ok(())
1469    }
1470
1471    async fn update_server_binary_if_needed(
1472        &self,
1473        delegate: &Arc<dyn SshClientDelegate>,
1474        dst_path: &Path,
1475        platform: SshPlatform,
1476        cx: &mut AsyncAppContext,
1477    ) -> Result<()> {
1478        if std::env::var("ZED_USE_CACHED_REMOTE_SERVER").is_ok() {
1479            if let Ok(installed_version) =
1480                run_cmd(self.socket.ssh_command(dst_path).arg("version")).await
1481            {
1482                log::info!("using cached server binary version {}", installed_version);
1483                return Ok(());
1484            }
1485        }
1486
1487        let (binary, version) = delegate.get_server_binary(platform, cx).await??;
1488
1489        let mut server_binary_exists = false;
1490        if !server_binary_exists && cfg!(not(debug_assertions)) {
1491            if let Ok(installed_version) =
1492                run_cmd(self.socket.ssh_command(dst_path).arg("version")).await
1493            {
1494                if installed_version.trim() == version.to_string() {
1495                    server_binary_exists = true;
1496                }
1497                log::info!("checked remote server binary for version. latest version: {}. remote server version: {}", version.to_string(), installed_version.trim());
1498            }
1499        }
1500
1501        if server_binary_exists {
1502            log::info!("remote development server already present",);
1503            return Ok(());
1504        }
1505
1506        match binary {
1507            ServerBinary::LocalBinary(src_path) => {
1508                self.upload_local_server_binary(&src_path, dst_path, delegate, cx)
1509                    .await
1510            }
1511            ServerBinary::ReleaseUrl { url, body } => {
1512                self.download_binary_on_server(&url, &body, dst_path, delegate, cx)
1513                    .await
1514            }
1515        }
1516    }
1517
1518    async fn download_binary_on_server(
1519        &self,
1520        url: &str,
1521        body: &str,
1522        dst_path: &Path,
1523        delegate: &Arc<dyn SshClientDelegate>,
1524        cx: &mut AsyncAppContext,
1525    ) -> Result<()> {
1526        let mut dst_path_gz = dst_path.to_path_buf();
1527        dst_path_gz.set_extension("gz");
1528
1529        if let Some(parent) = dst_path.parent() {
1530            run_cmd(self.socket.ssh_command("mkdir").arg("-p").arg(parent)).await?;
1531        }
1532
1533        delegate.set_status(Some("Downloading remote development server on host"), cx);
1534
1535        let script = format!(
1536            r#"
1537            if command -v wget >/dev/null 2>&1; then
1538                wget --max-redirect=5 --method=GET --header="Content-Type: application/json" --body-data='{}' '{}' -O '{}' && echo "wget"
1539            elif command -v curl >/dev/null 2>&1; then
1540                curl -L -X GET -H "Content-Type: application/json" -d '{}' '{}' -o '{}' && echo "curl"
1541            else
1542                echo "Neither curl nor wget is available" >&2
1543                exit 1
1544            fi
1545            "#,
1546            body.replace("'", r#"\'"#),
1547            url,
1548            dst_path_gz.display(),
1549            body.replace("'", r#"\'"#),
1550            url,
1551            dst_path_gz.display(),
1552        );
1553
1554        let output = run_cmd(self.socket.ssh_command("bash").arg("-c").arg(script))
1555            .await
1556            .context("Failed to download server binary")?;
1557
1558        if !output.contains("curl") && !output.contains("wget") {
1559            return Err(anyhow!("Failed to download server binary: {}", output));
1560        }
1561
1562        self.extract_server_binary(dst_path, &dst_path_gz, delegate, cx)
1563            .await
1564    }
1565
1566    async fn upload_local_server_binary(
1567        &self,
1568        src_path: &Path,
1569        dst_path: &Path,
1570        delegate: &Arc<dyn SshClientDelegate>,
1571        cx: &mut AsyncAppContext,
1572    ) -> Result<()> {
1573        let mut dst_path_gz = dst_path.to_path_buf();
1574        dst_path_gz.set_extension("gz");
1575
1576        if let Some(parent) = dst_path.parent() {
1577            run_cmd(self.socket.ssh_command("mkdir").arg("-p").arg(parent)).await?;
1578        }
1579
1580        let src_stat = fs::metadata(&src_path).await?;
1581        let size = src_stat.len();
1582
1583        let t0 = Instant::now();
1584        delegate.set_status(Some("Uploading remote development server"), cx);
1585        log::info!("uploading remote development server ({}kb)", size / 1024);
1586        self.upload_file(&src_path, &dst_path_gz)
1587            .await
1588            .context("failed to upload server binary")?;
1589        log::info!("uploaded remote development server in {:?}", t0.elapsed());
1590
1591        self.extract_server_binary(dst_path, &dst_path_gz, delegate, cx)
1592            .await
1593    }
1594
1595    async fn extract_server_binary(
1596        &self,
1597        dst_path: &Path,
1598        dst_path_gz: &Path,
1599        delegate: &Arc<dyn SshClientDelegate>,
1600        cx: &mut AsyncAppContext,
1601    ) -> Result<()> {
1602        delegate.set_status(Some("Extracting remote development server"), cx);
1603        run_cmd(
1604            self.socket
1605                .ssh_command("gunzip")
1606                .arg("--force")
1607                .arg(&dst_path_gz),
1608        )
1609        .await?;
1610
1611        let server_mode = 0o755;
1612        delegate.set_status(Some("Marking remote development server executable"), cx);
1613        run_cmd(
1614            self.socket
1615                .ssh_command("chmod")
1616                .arg(format!("{:o}", server_mode))
1617                .arg(dst_path),
1618        )
1619        .await?;
1620
1621        Ok(())
1622    }
1623
1624    async fn query_platform(&self) -> Result<SshPlatform> {
1625        let os = run_cmd(self.socket.ssh_command("uname").arg("-s")).await?;
1626        let arch = run_cmd(self.socket.ssh_command("uname").arg("-m")).await?;
1627
1628        let os = match os.trim() {
1629            "Darwin" => "macos",
1630            "Linux" => "linux",
1631            _ => Err(anyhow!("unknown uname os {os:?}"))?,
1632        };
1633        let arch = if arch.starts_with("arm") || arch.starts_with("aarch64") {
1634            "aarch64"
1635        } else if arch.starts_with("x86") || arch.starts_with("i686") {
1636            "x86_64"
1637        } else {
1638            Err(anyhow!("unknown uname architecture {arch:?}"))?
1639        };
1640
1641        Ok(SshPlatform { os, arch })
1642    }
1643
1644    async fn upload_file(&self, src_path: &Path, dest_path: &Path) -> Result<()> {
1645        let mut command = process::Command::new("scp");
1646        let output = self
1647            .socket
1648            .ssh_options(&mut command)
1649            .args(
1650                self.socket
1651                    .connection_options
1652                    .port
1653                    .map(|port| vec!["-P".to_string(), port.to_string()])
1654                    .unwrap_or_default(),
1655            )
1656            .arg(src_path)
1657            .arg(format!(
1658                "{}:{}",
1659                self.socket.connection_options.scp_url(),
1660                dest_path.display()
1661            ))
1662            .output()
1663            .await?;
1664
1665        if output.status.success() {
1666            Ok(())
1667        } else {
1668            Err(anyhow!(
1669                "failed to upload file {} -> {}: {}",
1670                src_path.display(),
1671                dest_path.display(),
1672                String::from_utf8_lossy(&output.stderr)
1673            ))
1674        }
1675    }
1676}
1677
1678type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1679
1680pub struct ChannelClient {
1681    next_message_id: AtomicU32,
1682    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1683    buffer: Mutex<VecDeque<Envelope>>,
1684    response_channels: ResponseChannels,
1685    message_handlers: Mutex<ProtoMessageHandlerSet>,
1686    max_received: AtomicU32,
1687    name: &'static str,
1688    task: Mutex<Task<Result<()>>>,
1689}
1690
1691impl ChannelClient {
1692    pub fn new(
1693        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1694        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1695        cx: &AppContext,
1696        name: &'static str,
1697    ) -> Arc<Self> {
1698        Arc::new_cyclic(|this| Self {
1699            outgoing_tx: Mutex::new(outgoing_tx),
1700            next_message_id: AtomicU32::new(0),
1701            max_received: AtomicU32::new(0),
1702            response_channels: ResponseChannels::default(),
1703            message_handlers: Default::default(),
1704            buffer: Mutex::new(VecDeque::new()),
1705            name,
1706            task: Mutex::new(Self::start_handling_messages(
1707                this.clone(),
1708                incoming_rx,
1709                &cx.to_async(),
1710            )),
1711        })
1712    }
1713
1714    fn start_handling_messages(
1715        this: Weak<Self>,
1716        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1717        cx: &AsyncAppContext,
1718    ) -> Task<Result<()>> {
1719        cx.spawn(|cx| {
1720            async move {
1721                let peer_id = PeerId { owner_id: 0, id: 0 };
1722                while let Some(incoming) = incoming_rx.next().await {
1723                    let Some(this) = this.upgrade() else {
1724                        return anyhow::Ok(());
1725                    };
1726                    if let Some(ack_id) = incoming.ack_id {
1727                        let mut buffer = this.buffer.lock();
1728                        while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1729                            buffer.pop_front();
1730                        }
1731                    }
1732                    if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) =
1733                        &incoming.payload
1734                    {
1735                        log::debug!("{}:ssh message received. name:FlushBufferedMessages", this.name);
1736                        {
1737                            let buffer = this.buffer.lock();
1738                            for envelope in buffer.iter() {
1739                                this.outgoing_tx.lock().unbounded_send(envelope.clone()).ok();
1740                            }
1741                        }
1742                        let mut envelope = proto::Ack{}.into_envelope(0, Some(incoming.id), None);
1743                        envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1744                        this.outgoing_tx.lock().unbounded_send(envelope).ok();
1745                        continue;
1746                    }
1747
1748                    this.max_received.store(incoming.id, SeqCst);
1749
1750                    if let Some(request_id) = incoming.responding_to {
1751                        let request_id = MessageId(request_id);
1752                        let sender = this.response_channels.lock().remove(&request_id);
1753                        if let Some(sender) = sender {
1754                            let (tx, rx) = oneshot::channel();
1755                            if incoming.payload.is_some() {
1756                                sender.send((incoming, tx)).ok();
1757                            }
1758                            rx.await.ok();
1759                        }
1760                    } else if let Some(envelope) =
1761                        build_typed_envelope(peer_id, Instant::now(), incoming)
1762                    {
1763                        let type_name = envelope.payload_type_name();
1764                        if let Some(future) = ProtoMessageHandlerSet::handle_message(
1765                            &this.message_handlers,
1766                            envelope,
1767                            this.clone().into(),
1768                            cx.clone(),
1769                        ) {
1770                            log::debug!("{}:ssh message received. name:{type_name}", this.name);
1771                            cx.foreground_executor().spawn(async move {
1772                                match future.await {
1773                                    Ok(_) => {
1774                                        log::debug!("{}:ssh message handled. name:{type_name}", this.name);
1775                                    }
1776                                    Err(error) => {
1777                                        log::error!(
1778                                            "{}:error handling message. type:{type_name}, error:{error}", this.name,
1779                                        );
1780                                    }
1781                                }
1782                            }).detach()
1783                        } else {
1784                            log::error!("{}:unhandled ssh message name:{type_name}", this.name);
1785                        }
1786                    }
1787                }
1788                anyhow::Ok(())
1789            }
1790        })
1791    }
1792
1793    pub fn reconnect(
1794        self: &Arc<Self>,
1795        incoming_rx: UnboundedReceiver<Envelope>,
1796        outgoing_tx: UnboundedSender<Envelope>,
1797        cx: &AsyncAppContext,
1798    ) {
1799        *self.outgoing_tx.lock() = outgoing_tx;
1800        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1801    }
1802
1803    pub fn subscribe_to_entity<E: 'static>(&self, remote_id: u64, entity: &Model<E>) {
1804        let id = (TypeId::of::<E>(), remote_id);
1805
1806        let mut message_handlers = self.message_handlers.lock();
1807        if message_handlers
1808            .entities_by_type_and_remote_id
1809            .contains_key(&id)
1810        {
1811            panic!("already subscribed to entity");
1812        }
1813
1814        message_handlers.entities_by_type_and_remote_id.insert(
1815            id,
1816            EntityMessageSubscriber::Entity {
1817                handle: entity.downgrade().into(),
1818            },
1819        );
1820    }
1821
1822    pub fn request<T: RequestMessage>(
1823        &self,
1824        payload: T,
1825    ) -> impl 'static + Future<Output = Result<T::Response>> {
1826        self.request_internal(payload, true)
1827    }
1828
1829    fn request_internal<T: RequestMessage>(
1830        &self,
1831        payload: T,
1832        use_buffer: bool,
1833    ) -> impl 'static + Future<Output = Result<T::Response>> {
1834        log::debug!("ssh request start. name:{}", T::NAME);
1835        let response =
1836            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1837        async move {
1838            let response = response.await?;
1839            log::debug!("ssh request finish. name:{}", T::NAME);
1840            T::Response::from_envelope(response)
1841                .ok_or_else(|| anyhow!("received a response of the wrong type"))
1842        }
1843    }
1844
1845    pub async fn resync(&self, timeout: Duration) -> Result<()> {
1846        smol::future::or(
1847            async {
1848                self.request_internal(proto::FlushBufferedMessages {}, false)
1849                    .await?;
1850
1851                for envelope in self.buffer.lock().iter() {
1852                    self.outgoing_tx
1853                        .lock()
1854                        .unbounded_send(envelope.clone())
1855                        .ok();
1856                }
1857                Ok(())
1858            },
1859            async {
1860                smol::Timer::after(timeout).await;
1861                Err(anyhow!("Timeout detected"))
1862            },
1863        )
1864        .await
1865    }
1866
1867    pub async fn ping(&self, timeout: Duration) -> Result<()> {
1868        smol::future::or(
1869            async {
1870                self.request(proto::Ping {}).await?;
1871                Ok(())
1872            },
1873            async {
1874                smol::Timer::after(timeout).await;
1875                Err(anyhow!("Timeout detected"))
1876            },
1877        )
1878        .await
1879    }
1880
1881    pub fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1882        log::debug!("ssh send name:{}", T::NAME);
1883        self.send_dynamic(payload.into_envelope(0, None, None))
1884    }
1885
1886    fn request_dynamic(
1887        &self,
1888        mut envelope: proto::Envelope,
1889        type_name: &'static str,
1890        use_buffer: bool,
1891    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1892        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1893        let (tx, rx) = oneshot::channel();
1894        let mut response_channels_lock = self.response_channels.lock();
1895        response_channels_lock.insert(MessageId(envelope.id), tx);
1896        drop(response_channels_lock);
1897
1898        let result = if use_buffer {
1899            self.send_buffered(envelope)
1900        } else {
1901            self.send_unbuffered(envelope)
1902        };
1903        async move {
1904            if let Err(error) = &result {
1905                log::error!("failed to send message: {}", error);
1906                return Err(anyhow!("failed to send message: {}", error));
1907            }
1908
1909            let response = rx.await.context("connection lost")?.0;
1910            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1911                return Err(RpcError::from_proto(error, type_name));
1912            }
1913            Ok(response)
1914        }
1915    }
1916
1917    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1918        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1919        self.send_buffered(envelope)
1920    }
1921
1922    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1923        envelope.ack_id = Some(self.max_received.load(SeqCst));
1924        self.buffer.lock().push_back(envelope.clone());
1925        // ignore errors on send (happen while we're reconnecting)
1926        // assume that the global "disconnected" overlay is sufficient.
1927        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1928        Ok(())
1929    }
1930
1931    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1932        envelope.ack_id = Some(self.max_received.load(SeqCst));
1933        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1934        Ok(())
1935    }
1936}
1937
1938impl ProtoClient for ChannelClient {
1939    fn request(
1940        &self,
1941        envelope: proto::Envelope,
1942        request_type: &'static str,
1943    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1944        self.request_dynamic(envelope, request_type, true).boxed()
1945    }
1946
1947    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1948        self.send_dynamic(envelope)
1949    }
1950
1951    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1952        self.send_dynamic(envelope)
1953    }
1954
1955    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1956        &self.message_handlers
1957    }
1958
1959    fn is_via_collab(&self) -> bool {
1960        false
1961    }
1962}
1963
1964#[cfg(any(test, feature = "test-support"))]
1965mod fake {
1966    use std::{path::PathBuf, sync::Arc};
1967
1968    use anyhow::Result;
1969    use async_trait::async_trait;
1970    use futures::{
1971        channel::{
1972            mpsc::{self, Sender},
1973            oneshot,
1974        },
1975        select_biased, FutureExt, SinkExt, StreamExt,
1976    };
1977    use gpui::{AsyncAppContext, BorrowAppContext, Global, SemanticVersion, Task};
1978    use rpc::proto::Envelope;
1979
1980    use super::{
1981        ChannelClient, ServerBinary, SshClientDelegate, SshConnectionOptions, SshPlatform,
1982        SshRemoteProcess,
1983    };
1984
1985    pub(super) struct SshRemoteConnection {
1986        connection_options: SshConnectionOptions,
1987    }
1988
1989    impl SshRemoteConnection {
1990        pub(super) fn new(
1991            connection_options: &SshConnectionOptions,
1992        ) -> Option<Box<dyn SshRemoteProcess>> {
1993            if connection_options.host == "<fake>" {
1994                return Some(Box::new(Self {
1995                    connection_options: connection_options.clone(),
1996                }));
1997            }
1998            return None;
1999        }
2000        pub(super) async fn multiplex(
2001            connection_options: SshConnectionOptions,
2002            mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
2003            mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
2004            mut connection_activity_tx: Sender<()>,
2005            cx: &mut AsyncAppContext,
2006        ) -> Task<Result<i32>> {
2007            let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
2008            let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
2009
2010            let (channel, server_cx) = cx
2011                .update(|cx| {
2012                    cx.update_global(|conns: &mut ServerConnections, _| {
2013                        conns.get(connection_options.port.unwrap())
2014                    })
2015                })
2016                .unwrap();
2017            channel.reconnect(server_incoming_rx, server_outgoing_tx, &server_cx);
2018
2019            // send to proxy_tx to get to the server.
2020            // receive from
2021
2022            cx.background_executor().spawn(async move {
2023                loop {
2024                    select_biased! {
2025                        server_to_client = server_outgoing_rx.next().fuse() => {
2026                            let Some(server_to_client) = server_to_client else {
2027                                return Ok(1)
2028                            };
2029                            connection_activity_tx.try_send(()).ok();
2030                            client_incoming_tx.send(server_to_client).await.ok();
2031                        }
2032                        client_to_server = client_outgoing_rx.next().fuse() => {
2033                            let Some(client_to_server) = client_to_server else {
2034                                return Ok(1)
2035                            };
2036                            server_incoming_tx.send(client_to_server).await.ok();
2037                        }
2038                    }
2039                }
2040            })
2041        }
2042    }
2043
2044    #[async_trait]
2045    impl SshRemoteProcess for SshRemoteConnection {
2046        async fn kill(&mut self) -> Result<()> {
2047            Ok(())
2048        }
2049
2050        fn ssh_args(&self) -> Vec<String> {
2051            Vec::new()
2052        }
2053
2054        fn connection_options(&self) -> SshConnectionOptions {
2055            self.connection_options.clone()
2056        }
2057    }
2058
2059    #[derive(Default)]
2060    pub(super) struct ServerConnections(Vec<(Arc<ChannelClient>, AsyncAppContext)>);
2061    impl Global for ServerConnections {}
2062
2063    impl ServerConnections {
2064        pub(super) fn push(&mut self, server: Arc<ChannelClient>, cx: AsyncAppContext) -> u16 {
2065            self.0.push((server.clone(), cx));
2066            self.0.len() as u16 - 1
2067        }
2068
2069        pub(super) fn get(&mut self, port: u16) -> (Arc<ChannelClient>, AsyncAppContext) {
2070            self.0
2071                .get(port as usize)
2072                .expect("no fake server for port")
2073                .clone()
2074        }
2075    }
2076
2077    pub(super) struct Delegate;
2078
2079    impl SshClientDelegate for Delegate {
2080        fn ask_password(
2081            &self,
2082            _: String,
2083            _: &mut AsyncAppContext,
2084        ) -> oneshot::Receiver<Result<String>> {
2085            unreachable!()
2086        }
2087        fn remote_server_binary_path(
2088            &self,
2089            _: SshPlatform,
2090            _: &mut AsyncAppContext,
2091        ) -> Result<PathBuf> {
2092            unreachable!()
2093        }
2094        fn get_server_binary(
2095            &self,
2096            _: SshPlatform,
2097            _: &mut AsyncAppContext,
2098        ) -> oneshot::Receiver<Result<(ServerBinary, SemanticVersion)>> {
2099            unreachable!()
2100        }
2101
2102        fn set_status(&self, _: Option<&str>, _: &mut AsyncAppContext) {
2103            unreachable!()
2104        }
2105    }
2106}