remote_client.rs

   1use crate::{
   2    SshConnectionOptions,
   3    protocol::MessageId,
   4    proxy::ProxyLaunchError,
   5    transport::{
   6        ssh::SshRemoteConnection,
   7        wsl::{WslConnectionOptions, WslRemoteConnection},
   8    },
   9};
  10use anyhow::{Context as _, Result, anyhow};
  11use askpass::EncryptedPassword;
  12use async_trait::async_trait;
  13use collections::HashMap;
  14use futures::{
  15    Future, FutureExt as _, StreamExt as _,
  16    channel::{
  17        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  18        oneshot,
  19    },
  20    future::{BoxFuture, Shared},
  21    select, select_biased,
  22};
  23use gpui::{
  24    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  25    EventEmitter, Global, SemanticVersion, Task, WeakEntity,
  26};
  27use parking_lot::Mutex;
  28
  29use release_channel::ReleaseChannel;
  30use rpc::{
  31    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  32    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  33};
  34use std::{
  35    collections::VecDeque,
  36    fmt,
  37    ops::ControlFlow,
  38    path::PathBuf,
  39    sync::{
  40        Arc, Weak,
  41        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  42    },
  43    time::{Duration, Instant},
  44};
  45use util::{
  46    ResultExt,
  47    paths::{PathStyle, RemotePathBuf},
  48};
  49
  50#[derive(Copy, Clone, Debug)]
  51pub struct RemotePlatform {
  52    pub os: &'static str,
  53    pub arch: &'static str,
  54}
  55
  56#[derive(Clone, Debug)]
  57pub struct CommandTemplate {
  58    pub program: String,
  59    pub args: Vec<String>,
  60    pub env: HashMap<String, String>,
  61}
  62
  63pub trait RemoteClientDelegate: Send + Sync {
  64    fn ask_password(
  65        &self,
  66        prompt: String,
  67        tx: oneshot::Sender<EncryptedPassword>,
  68        cx: &mut AsyncApp,
  69    );
  70    fn get_download_params(
  71        &self,
  72        platform: RemotePlatform,
  73        release_channel: ReleaseChannel,
  74        version: Option<SemanticVersion>,
  75        cx: &mut AsyncApp,
  76    ) -> Task<Result<Option<(String, String)>>>;
  77    fn download_server_binary_locally(
  78        &self,
  79        platform: RemotePlatform,
  80        release_channel: ReleaseChannel,
  81        version: Option<SemanticVersion>,
  82        cx: &mut AsyncApp,
  83    ) -> Task<Result<PathBuf>>;
  84    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
  85}
  86
  87const MAX_MISSED_HEARTBEATS: usize = 5;
  88const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
  89const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
  90
  91const MAX_RECONNECT_ATTEMPTS: usize = 3;
  92
  93enum State {
  94    Connecting,
  95    Connected {
  96        ssh_connection: Arc<dyn RemoteConnection>,
  97        delegate: Arc<dyn RemoteClientDelegate>,
  98
  99        multiplex_task: Task<Result<()>>,
 100        heartbeat_task: Task<Result<()>>,
 101    },
 102    HeartbeatMissed {
 103        missed_heartbeats: usize,
 104
 105        ssh_connection: Arc<dyn RemoteConnection>,
 106        delegate: Arc<dyn RemoteClientDelegate>,
 107
 108        multiplex_task: Task<Result<()>>,
 109        heartbeat_task: Task<Result<()>>,
 110    },
 111    Reconnecting,
 112    ReconnectFailed {
 113        ssh_connection: Arc<dyn RemoteConnection>,
 114        delegate: Arc<dyn RemoteClientDelegate>,
 115
 116        error: anyhow::Error,
 117        attempts: usize,
 118    },
 119    ReconnectExhausted,
 120    ServerNotRunning,
 121}
 122
 123impl fmt::Display for State {
 124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 125        match self {
 126            Self::Connecting => write!(f, "connecting"),
 127            Self::Connected { .. } => write!(f, "connected"),
 128            Self::Reconnecting => write!(f, "reconnecting"),
 129            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 130            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 131            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 132            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 133        }
 134    }
 135}
 136
 137impl State {
 138    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 139        match self {
 140            Self::Connected { ssh_connection, .. } => Some(ssh_connection.clone()),
 141            Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.clone()),
 142            Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.clone()),
 143            _ => None,
 144        }
 145    }
 146
 147    fn can_reconnect(&self) -> bool {
 148        match self {
 149            Self::Connected { .. }
 150            | Self::HeartbeatMissed { .. }
 151            | Self::ReconnectFailed { .. } => true,
 152            State::Connecting
 153            | State::Reconnecting
 154            | State::ReconnectExhausted
 155            | State::ServerNotRunning => false,
 156        }
 157    }
 158
 159    fn is_reconnect_failed(&self) -> bool {
 160        matches!(self, Self::ReconnectFailed { .. })
 161    }
 162
 163    fn is_reconnect_exhausted(&self) -> bool {
 164        matches!(self, Self::ReconnectExhausted { .. })
 165    }
 166
 167    fn is_server_not_running(&self) -> bool {
 168        matches!(self, Self::ServerNotRunning)
 169    }
 170
 171    fn is_reconnecting(&self) -> bool {
 172        matches!(self, Self::Reconnecting { .. })
 173    }
 174
 175    fn heartbeat_recovered(self) -> Self {
 176        match self {
 177            Self::HeartbeatMissed {
 178                ssh_connection,
 179                delegate,
 180                multiplex_task,
 181                heartbeat_task,
 182                ..
 183            } => Self::Connected {
 184                ssh_connection,
 185                delegate,
 186                multiplex_task,
 187                heartbeat_task,
 188            },
 189            _ => self,
 190        }
 191    }
 192
 193    fn heartbeat_missed(self) -> Self {
 194        match self {
 195            Self::Connected {
 196                ssh_connection,
 197                delegate,
 198                multiplex_task,
 199                heartbeat_task,
 200            } => Self::HeartbeatMissed {
 201                missed_heartbeats: 1,
 202                ssh_connection,
 203                delegate,
 204                multiplex_task,
 205                heartbeat_task,
 206            },
 207            Self::HeartbeatMissed {
 208                missed_heartbeats,
 209                ssh_connection,
 210                delegate,
 211                multiplex_task,
 212                heartbeat_task,
 213            } => Self::HeartbeatMissed {
 214                missed_heartbeats: missed_heartbeats + 1,
 215                ssh_connection,
 216                delegate,
 217                multiplex_task,
 218                heartbeat_task,
 219            },
 220            _ => self,
 221        }
 222    }
 223}
 224
 225/// The state of the ssh connection.
 226#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 227pub enum ConnectionState {
 228    Connecting,
 229    Connected,
 230    HeartbeatMissed,
 231    Reconnecting,
 232    Disconnected,
 233}
 234
 235impl From<&State> for ConnectionState {
 236    fn from(value: &State) -> Self {
 237        match value {
 238            State::Connecting => Self::Connecting,
 239            State::Connected { .. } => Self::Connected,
 240            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 241            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 242            State::ReconnectExhausted => Self::Disconnected,
 243            State::ServerNotRunning => Self::Disconnected,
 244        }
 245    }
 246}
 247
 248pub struct RemoteClient {
 249    client: Arc<ChannelClient>,
 250    unique_identifier: String,
 251    connection_options: RemoteConnectionOptions,
 252    path_style: PathStyle,
 253    state: Option<State>,
 254}
 255
 256#[derive(Debug)]
 257pub enum RemoteClientEvent {
 258    Disconnected,
 259}
 260
 261impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 262
 263// Identifies the socket on the remote server so that reconnects
 264// can re-join the same project.
 265pub enum ConnectionIdentifier {
 266    Setup(u64),
 267    Workspace(i64),
 268}
 269
 270static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 271
 272impl ConnectionIdentifier {
 273    pub fn setup() -> Self {
 274        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 275    }
 276
 277    // This string gets used in a socket name, and so must be relatively short.
 278    // The total length of:
 279    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 280    // Must be less than about 100 characters
 281    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 282    // So our strings should be at most 20 characters or so.
 283    fn to_string(&self, cx: &App) -> String {
 284        let identifier_prefix = match ReleaseChannel::global(cx) {
 285            ReleaseChannel::Stable => "".to_string(),
 286            release_channel => format!("{}-", release_channel.dev_name()),
 287        };
 288        match self {
 289            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 290            Self::Workspace(workspace_id) => {
 291                format!("{identifier_prefix}workspace-{workspace_id}",)
 292            }
 293        }
 294    }
 295}
 296
 297impl RemoteClient {
 298    pub fn ssh(
 299        unique_identifier: ConnectionIdentifier,
 300        connection_options: SshConnectionOptions,
 301        cancellation: oneshot::Receiver<()>,
 302        delegate: Arc<dyn RemoteClientDelegate>,
 303        cx: &mut App,
 304    ) -> Task<Result<Option<Entity<Self>>>> {
 305        Self::new(
 306            unique_identifier,
 307            RemoteConnectionOptions::Ssh(connection_options),
 308            cancellation,
 309            delegate,
 310            cx,
 311        )
 312    }
 313
 314    pub fn new(
 315        unique_identifier: ConnectionIdentifier,
 316        connection_options: RemoteConnectionOptions,
 317        cancellation: oneshot::Receiver<()>,
 318        delegate: Arc<dyn RemoteClientDelegate>,
 319        cx: &mut App,
 320    ) -> Task<Result<Option<Entity<Self>>>> {
 321        let unique_identifier = unique_identifier.to_string(cx);
 322        cx.spawn(async move |cx| {
 323            let success = Box::pin(async move {
 324                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 325                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 326                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 327
 328                let client =
 329                    cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "client"))?;
 330
 331                let ssh_connection = cx
 332                    .update(|cx| {
 333                        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 334                            pool.connect(connection_options.clone(), &delegate, cx)
 335                        })
 336                    })?
 337                    .await
 338                    .map_err(|e| e.cloned())?;
 339
 340                let path_style = ssh_connection.path_style();
 341                let this = cx.new(|_| Self {
 342                    client: client.clone(),
 343                    unique_identifier: unique_identifier.clone(),
 344                    connection_options,
 345                    path_style,
 346                    state: Some(State::Connecting),
 347                })?;
 348
 349                let io_task = ssh_connection.start_proxy(
 350                    unique_identifier,
 351                    false,
 352                    incoming_tx,
 353                    outgoing_rx,
 354                    connection_activity_tx,
 355                    delegate.clone(),
 356                    cx,
 357                );
 358
 359                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 360
 361                let timeout = cx.background_executor().timer(HEARTBEAT_TIMEOUT).fuse();
 362                futures::pin_mut!(timeout);
 363
 364                select_biased! {
 365                    ready = client.wait_for_remote_started() => {
 366                        if ready.is_none() {
 367                            let error = anyhow::anyhow!("remote client exited before becoming ready");
 368                            log::error!("failed to establish connection: {}", error);
 369                            return Err(error);
 370                        }
 371                    },
 372                    _ = timeout => {
 373                        let error = anyhow::anyhow!("remote client did not become ready within the timeout");
 374                        log::error!("failed to establish connection: {}", error);
 375                        return Err(error);
 376                    }
 377                }
 378
 379                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 380                    log::error!("failed to establish connection: {}", error);
 381                    return Err(error);
 382                }
 383
 384                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 385
 386                this.update(cx, |this, _| {
 387                    this.state = Some(State::Connected {
 388                        ssh_connection,
 389                        delegate,
 390                        multiplex_task,
 391                        heartbeat_task,
 392                    });
 393                })?;
 394
 395                Ok(Some(this))
 396            });
 397
 398            select! {
 399                _ = cancellation.fuse() => {
 400                    Ok(None)
 401                }
 402                result = success.fuse() =>  result
 403            }
 404        })
 405    }
 406
 407    pub fn proto_client_from_channels(
 408        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 409        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 410        cx: &App,
 411        name: &'static str,
 412    ) -> AnyProtoClient {
 413        ChannelClient::new(incoming_rx, outgoing_tx, cx, name).into()
 414    }
 415
 416    pub fn shutdown_processes<T: RequestMessage>(
 417        &mut self,
 418        shutdown_request: Option<T>,
 419        executor: BackgroundExecutor,
 420    ) -> Option<impl Future<Output = ()> + use<T>> {
 421        let state = self.state.take()?;
 422        log::info!("shutting down ssh processes");
 423
 424        let State::Connected {
 425            multiplex_task,
 426            heartbeat_task,
 427            ssh_connection,
 428            delegate,
 429        } = state
 430        else {
 431            return None;
 432        };
 433
 434        let client = self.client.clone();
 435
 436        Some(async move {
 437            if let Some(shutdown_request) = shutdown_request {
 438                client.send(shutdown_request).log_err();
 439                // We wait 50ms instead of waiting for a response, because
 440                // waiting for a response would require us to wait on the main thread
 441                // which we want to avoid in an `on_app_quit` callback.
 442                executor.timer(Duration::from_millis(50)).await;
 443            }
 444
 445            // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
 446            // child of master_process.
 447            drop(multiplex_task);
 448            // Now drop the rest of state, which kills master process.
 449            drop(heartbeat_task);
 450            drop(ssh_connection);
 451            drop(delegate);
 452        })
 453    }
 454
 455    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 456        let can_reconnect = self
 457            .state
 458            .as_ref()
 459            .map(|state| state.can_reconnect())
 460            .unwrap_or(false);
 461        if !can_reconnect {
 462            log::info!("aborting reconnect, because not in state that allows reconnecting");
 463            let error = if let Some(state) = self.state.as_ref() {
 464                format!("invalid state, cannot reconnect while in state {state}")
 465            } else {
 466                "no state set".to_string()
 467            };
 468            anyhow::bail!(error);
 469        }
 470
 471        let state = self.state.take().unwrap();
 472        let (attempts, remote_connection, delegate) = match state {
 473            State::Connected {
 474                ssh_connection,
 475                delegate,
 476                multiplex_task,
 477                heartbeat_task,
 478            }
 479            | State::HeartbeatMissed {
 480                ssh_connection,
 481                delegate,
 482                multiplex_task,
 483                heartbeat_task,
 484                ..
 485            } => {
 486                drop(multiplex_task);
 487                drop(heartbeat_task);
 488                (0, ssh_connection, delegate)
 489            }
 490            State::ReconnectFailed {
 491                attempts,
 492                ssh_connection,
 493                delegate,
 494                ..
 495            } => (attempts, ssh_connection, delegate),
 496            State::Connecting
 497            | State::Reconnecting
 498            | State::ReconnectExhausted
 499            | State::ServerNotRunning => unreachable!(),
 500        };
 501
 502        let attempts = attempts + 1;
 503        if attempts > MAX_RECONNECT_ATTEMPTS {
 504            log::error!(
 505                "Failed to reconnect to after {} attempts, giving up",
 506                MAX_RECONNECT_ATTEMPTS
 507            );
 508            self.set_state(State::ReconnectExhausted, cx);
 509            return Ok(());
 510        }
 511
 512        self.set_state(State::Reconnecting, cx);
 513
 514        log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
 515
 516        let unique_identifier = self.unique_identifier.clone();
 517        let client = self.client.clone();
 518        let reconnect_task = cx.spawn(async move |this, cx| {
 519            macro_rules! failed {
 520                ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr) => {
 521                    return State::ReconnectFailed {
 522                        error: anyhow!($error),
 523                        attempts: $attempts,
 524                        ssh_connection: $ssh_connection,
 525                        delegate: $delegate,
 526                    };
 527                };
 528            }
 529
 530            if let Err(error) = remote_connection
 531                .kill()
 532                .await
 533                .context("Failed to kill ssh process")
 534            {
 535                failed!(error, attempts, remote_connection, delegate);
 536            };
 537
 538            let connection_options = remote_connection.connection_options();
 539
 540            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 541            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 542            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 543
 544            let (ssh_connection, io_task) = match async {
 545                let ssh_connection = cx
 546                    .update_global(|pool: &mut ConnectionPool, cx| {
 547                        pool.connect(connection_options, &delegate, cx)
 548                    })?
 549                    .await
 550                    .map_err(|error| error.cloned())?;
 551
 552                let io_task = ssh_connection.start_proxy(
 553                    unique_identifier,
 554                    true,
 555                    incoming_tx,
 556                    outgoing_rx,
 557                    connection_activity_tx,
 558                    delegate.clone(),
 559                    cx,
 560                );
 561                anyhow::Ok((ssh_connection, io_task))
 562            }
 563            .await
 564            {
 565                Ok((ssh_connection, io_task)) => (ssh_connection, io_task),
 566                Err(error) => {
 567                    failed!(error, attempts, remote_connection, delegate);
 568                }
 569            };
 570
 571            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 572            client.reconnect(incoming_rx, outgoing_tx, cx);
 573
 574            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 575                failed!(error, attempts, ssh_connection, delegate);
 576            };
 577
 578            State::Connected {
 579                ssh_connection,
 580                delegate,
 581                multiplex_task,
 582                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 583            }
 584        });
 585
 586        cx.spawn(async move |this, cx| {
 587            let new_state = reconnect_task.await;
 588            this.update(cx, |this, cx| {
 589                this.try_set_state(cx, |old_state| {
 590                    if old_state.is_reconnecting() {
 591                        match &new_state {
 592                            State::Connecting
 593                            | State::Reconnecting
 594                            | State::HeartbeatMissed { .. }
 595                            | State::ServerNotRunning => {}
 596                            State::Connected { .. } => {
 597                                log::info!("Successfully reconnected");
 598                            }
 599                            State::ReconnectFailed {
 600                                error, attempts, ..
 601                            } => {
 602                                log::error!(
 603                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 604                                    attempts,
 605                                    error
 606                                );
 607                            }
 608                            State::ReconnectExhausted => {
 609                                log::error!("Reconnect attempt failed and all attempts exhausted");
 610                            }
 611                        }
 612                        Some(new_state)
 613                    } else {
 614                        None
 615                    }
 616                });
 617
 618                if this.state_is(State::is_reconnect_failed) {
 619                    this.reconnect(cx)
 620                } else if this.state_is(State::is_reconnect_exhausted) {
 621                    Ok(())
 622                } else {
 623                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 624                    Ok(())
 625                }
 626            })
 627        })
 628        .detach_and_log_err(cx);
 629
 630        Ok(())
 631    }
 632
 633    fn heartbeat(
 634        this: WeakEntity<Self>,
 635        mut connection_activity_rx: mpsc::Receiver<()>,
 636        cx: &mut AsyncApp,
 637    ) -> Task<Result<()>> {
 638        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 639            return Task::ready(Err(anyhow!("SshRemoteClient lost")));
 640        };
 641
 642        cx.spawn(async move |cx| {
 643            let mut missed_heartbeats = 0;
 644
 645            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 646            futures::pin_mut!(keepalive_timer);
 647
 648            loop {
 649                select_biased! {
 650                    result = connection_activity_rx.next().fuse() => {
 651                        if result.is_none() {
 652                            log::warn!("ssh heartbeat: connection activity channel has been dropped. stopping.");
 653                            return Ok(());
 654                        }
 655
 656                        if missed_heartbeats != 0 {
 657                            missed_heartbeats = 0;
 658                            let _ =this.update(cx, |this, cx| {
 659                                this.handle_heartbeat_result(missed_heartbeats, cx)
 660                            })?;
 661                        }
 662                    }
 663                    _ = keepalive_timer => {
 664                        log::debug!("Sending heartbeat to server...");
 665
 666                        let result = select_biased! {
 667                            _ = connection_activity_rx.next().fuse() => {
 668                                Ok(())
 669                            }
 670                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 671                                ping_result
 672                            }
 673                        };
 674
 675                        if result.is_err() {
 676                            missed_heartbeats += 1;
 677                            log::warn!(
 678                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 679                                HEARTBEAT_TIMEOUT,
 680                                missed_heartbeats,
 681                                MAX_MISSED_HEARTBEATS
 682                            );
 683                        } else if missed_heartbeats != 0 {
 684                            missed_heartbeats = 0;
 685                        } else {
 686                            continue;
 687                        }
 688
 689                        let result = this.update(cx, |this, cx| {
 690                            this.handle_heartbeat_result(missed_heartbeats, cx)
 691                        })?;
 692                        if result.is_break() {
 693                            return Ok(());
 694                        }
 695                    }
 696                }
 697
 698                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 699            }
 700        })
 701    }
 702
 703    fn handle_heartbeat_result(
 704        &mut self,
 705        missed_heartbeats: usize,
 706        cx: &mut Context<Self>,
 707    ) -> ControlFlow<()> {
 708        let state = self.state.take().unwrap();
 709        let next_state = if missed_heartbeats > 0 {
 710            state.heartbeat_missed()
 711        } else {
 712            state.heartbeat_recovered()
 713        };
 714
 715        self.set_state(next_state, cx);
 716
 717        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 718            log::error!(
 719                "Missed last {} heartbeats. Reconnecting...",
 720                missed_heartbeats
 721            );
 722
 723            self.reconnect(cx)
 724                .context("failed to start reconnect process after missing heartbeats")
 725                .log_err();
 726            ControlFlow::Break(())
 727        } else {
 728            ControlFlow::Continue(())
 729        }
 730    }
 731
 732    fn monitor(
 733        this: WeakEntity<Self>,
 734        io_task: Task<Result<i32>>,
 735        cx: &AsyncApp,
 736    ) -> Task<Result<()>> {
 737        cx.spawn(async move |cx| {
 738            let result = io_task.await;
 739
 740            match result {
 741                Ok(exit_code) => {
 742                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 743                        match error {
 744                            ProxyLaunchError::ServerNotRunning => {
 745                                log::error!("failed to reconnect because server is not running");
 746                                this.update(cx, |this, cx| {
 747                                    this.set_state(State::ServerNotRunning, cx);
 748                                })?;
 749                            }
 750                        }
 751                    } else if exit_code > 0 {
 752                        log::error!("proxy process terminated unexpectedly");
 753                        this.update(cx, |this, cx| {
 754                            this.reconnect(cx).ok();
 755                        })?;
 756                    }
 757                }
 758                Err(error) => {
 759                    log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
 760                    this.update(cx, |this, cx| {
 761                        this.reconnect(cx).ok();
 762                    })?;
 763                }
 764            }
 765
 766            Ok(())
 767        })
 768    }
 769
 770    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 771        self.state.as_ref().is_some_and(check)
 772    }
 773
 774    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 775        let new_state = self.state.as_ref().and_then(map);
 776        if let Some(new_state) = new_state {
 777            self.state.replace(new_state);
 778            cx.notify();
 779        }
 780    }
 781
 782    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 783        log::info!("setting state to '{}'", &state);
 784
 785        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 786        let is_server_not_running = state.is_server_not_running();
 787        self.state.replace(state);
 788
 789        if is_reconnect_exhausted || is_server_not_running {
 790            cx.emit(RemoteClientEvent::Disconnected);
 791        }
 792        cx.notify();
 793    }
 794
 795    pub fn shell(&self) -> Option<String> {
 796        Some(self.remote_connection()?.shell())
 797    }
 798
 799    pub fn default_system_shell(&self) -> Option<String> {
 800        Some(self.remote_connection()?.default_system_shell())
 801    }
 802
 803    pub fn shares_network_interface(&self) -> bool {
 804        self.remote_connection()
 805            .map_or(false, |connection| connection.shares_network_interface())
 806    }
 807
 808    pub fn build_command(
 809        &self,
 810        program: Option<String>,
 811        args: &[String],
 812        env: &HashMap<String, String>,
 813        working_dir: Option<String>,
 814        port_forward: Option<(u16, String, u16)>,
 815    ) -> Result<CommandTemplate> {
 816        let Some(connection) = self.remote_connection() else {
 817            return Err(anyhow!("no ssh connection"));
 818        };
 819        connection.build_command(program, args, env, working_dir, port_forward)
 820    }
 821
 822    pub fn upload_directory(
 823        &self,
 824        src_path: PathBuf,
 825        dest_path: RemotePathBuf,
 826        cx: &App,
 827    ) -> Task<Result<()>> {
 828        let Some(connection) = self.remote_connection() else {
 829            return Task::ready(Err(anyhow!("no ssh connection")));
 830        };
 831        connection.upload_directory(src_path, dest_path, cx)
 832    }
 833
 834    pub fn proto_client(&self) -> AnyProtoClient {
 835        self.client.clone().into()
 836    }
 837
 838    pub fn connection_options(&self) -> RemoteConnectionOptions {
 839        self.connection_options.clone()
 840    }
 841
 842    pub fn connection_state(&self) -> ConnectionState {
 843        self.state
 844            .as_ref()
 845            .map(ConnectionState::from)
 846            .unwrap_or(ConnectionState::Disconnected)
 847    }
 848
 849    pub fn is_disconnected(&self) -> bool {
 850        self.connection_state() == ConnectionState::Disconnected
 851    }
 852
 853    pub fn path_style(&self) -> PathStyle {
 854        self.path_style
 855    }
 856
 857    #[cfg(any(test, feature = "test-support"))]
 858    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
 859        let opts = self.connection_options();
 860        client_cx.spawn(async move |cx| {
 861            let connection = cx
 862                .update_global(|c: &mut ConnectionPool, _| {
 863                    if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
 864                        c.clone()
 865                    } else {
 866                        panic!("missing test connection")
 867                    }
 868                })
 869                .unwrap()
 870                .await
 871                .unwrap();
 872
 873            connection.simulate_disconnect(cx);
 874        })
 875    }
 876
 877    #[cfg(any(test, feature = "test-support"))]
 878    pub fn fake_server(
 879        client_cx: &mut gpui::TestAppContext,
 880        server_cx: &mut gpui::TestAppContext,
 881    ) -> (RemoteConnectionOptions, AnyProtoClient) {
 882        let port = client_cx
 883            .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
 884        let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
 885            host: "<fake>".to_string(),
 886            port: Some(port),
 887            ..Default::default()
 888        });
 889        let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
 890        let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
 891        let server_client =
 892            server_cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server"));
 893        let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
 894            connection_options: opts.clone(),
 895            server_cx: fake::SendableCx::new(server_cx),
 896            server_channel: server_client.clone(),
 897        });
 898
 899        client_cx.update(|cx| {
 900            cx.update_default_global(|c: &mut ConnectionPool, cx| {
 901                c.connections.insert(
 902                    opts.clone(),
 903                    ConnectionPoolEntry::Connecting(
 904                        cx.background_spawn({
 905                            let connection = connection.clone();
 906                            async move { Ok(connection.clone()) }
 907                        })
 908                        .shared(),
 909                    ),
 910                );
 911            })
 912        });
 913
 914        (opts, server_client.into())
 915    }
 916
 917    #[cfg(any(test, feature = "test-support"))]
 918    pub async fn fake_client(
 919        opts: RemoteConnectionOptions,
 920        client_cx: &mut gpui::TestAppContext,
 921    ) -> Entity<Self> {
 922        let (_tx, rx) = oneshot::channel();
 923        client_cx
 924            .update(|cx| {
 925                Self::new(
 926                    ConnectionIdentifier::setup(),
 927                    opts,
 928                    rx,
 929                    Arc::new(fake::Delegate),
 930                    cx,
 931                )
 932            })
 933            .await
 934            .unwrap()
 935            .unwrap()
 936    }
 937
 938    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 939        self.state
 940            .as_ref()
 941            .and_then(|state| state.remote_connection())
 942    }
 943}
 944
 945enum ConnectionPoolEntry {
 946    Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
 947    Connected(Weak<dyn RemoteConnection>),
 948}
 949
 950#[derive(Default)]
 951struct ConnectionPool {
 952    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
 953}
 954
 955impl Global for ConnectionPool {}
 956
 957impl ConnectionPool {
 958    pub fn connect(
 959        &mut self,
 960        opts: RemoteConnectionOptions,
 961        delegate: &Arc<dyn RemoteClientDelegate>,
 962        cx: &mut App,
 963    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
 964        let connection = self.connections.get(&opts);
 965        match connection {
 966            Some(ConnectionPoolEntry::Connecting(task)) => {
 967                let delegate = delegate.clone();
 968                cx.spawn(async move |cx| {
 969                    delegate.set_status(Some("Waiting for existing connection attempt"), cx);
 970                })
 971                .detach();
 972                return task.clone();
 973            }
 974            Some(ConnectionPoolEntry::Connected(ssh)) => {
 975                if let Some(ssh) = ssh.upgrade()
 976                    && !ssh.has_been_killed()
 977                {
 978                    return Task::ready(Ok(ssh)).shared();
 979                }
 980                self.connections.remove(&opts);
 981            }
 982            None => {}
 983        }
 984
 985        let task = cx
 986            .spawn({
 987                let opts = opts.clone();
 988                let delegate = delegate.clone();
 989                async move |cx| {
 990                    let connection = match opts.clone() {
 991                        RemoteConnectionOptions::Ssh(opts) => {
 992                            SshRemoteConnection::new(opts, delegate, cx)
 993                                .await
 994                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
 995                        }
 996                        RemoteConnectionOptions::Wsl(opts) => {
 997                            WslRemoteConnection::new(opts, delegate, cx)
 998                                .await
 999                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1000                        }
1001                    };
1002
1003                    cx.update_global(|pool: &mut Self, _| {
1004                        debug_assert!(matches!(
1005                            pool.connections.get(&opts),
1006                            Some(ConnectionPoolEntry::Connecting(_))
1007                        ));
1008                        match connection {
1009                            Ok(connection) => {
1010                                pool.connections.insert(
1011                                    opts.clone(),
1012                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1013                                );
1014                                Ok(connection)
1015                            }
1016                            Err(error) => {
1017                                pool.connections.remove(&opts);
1018                                Err(Arc::new(error))
1019                            }
1020                        }
1021                    })?
1022                }
1023            })
1024            .shared();
1025
1026        self.connections
1027            .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1028        task
1029    }
1030}
1031
1032#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1033pub enum RemoteConnectionOptions {
1034    Ssh(SshConnectionOptions),
1035    Wsl(WslConnectionOptions),
1036}
1037
1038impl RemoteConnectionOptions {
1039    pub fn display_name(&self) -> String {
1040        match self {
1041            RemoteConnectionOptions::Ssh(opts) => opts.host.clone(),
1042            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1043        }
1044    }
1045}
1046
1047impl From<SshConnectionOptions> for RemoteConnectionOptions {
1048    fn from(opts: SshConnectionOptions) -> Self {
1049        RemoteConnectionOptions::Ssh(opts)
1050    }
1051}
1052
1053impl From<WslConnectionOptions> for RemoteConnectionOptions {
1054    fn from(opts: WslConnectionOptions) -> Self {
1055        RemoteConnectionOptions::Wsl(opts)
1056    }
1057}
1058
1059#[async_trait(?Send)]
1060pub(crate) trait RemoteConnection: Send + Sync {
1061    fn start_proxy(
1062        &self,
1063        unique_identifier: String,
1064        reconnect: bool,
1065        incoming_tx: UnboundedSender<Envelope>,
1066        outgoing_rx: UnboundedReceiver<Envelope>,
1067        connection_activity_tx: Sender<()>,
1068        delegate: Arc<dyn RemoteClientDelegate>,
1069        cx: &mut AsyncApp,
1070    ) -> Task<Result<i32>>;
1071    fn upload_directory(
1072        &self,
1073        src_path: PathBuf,
1074        dest_path: RemotePathBuf,
1075        cx: &App,
1076    ) -> Task<Result<()>>;
1077    async fn kill(&self) -> Result<()>;
1078    fn has_been_killed(&self) -> bool;
1079    fn shares_network_interface(&self) -> bool {
1080        false
1081    }
1082    fn build_command(
1083        &self,
1084        program: Option<String>,
1085        args: &[String],
1086        env: &HashMap<String, String>,
1087        working_dir: Option<String>,
1088        port_forward: Option<(u16, String, u16)>,
1089    ) -> Result<CommandTemplate>;
1090    fn connection_options(&self) -> RemoteConnectionOptions;
1091    fn path_style(&self) -> PathStyle;
1092    fn shell(&self) -> String;
1093    fn default_system_shell(&self) -> String;
1094
1095    #[cfg(any(test, feature = "test-support"))]
1096    fn simulate_disconnect(&self, _: &AsyncApp) {}
1097}
1098
1099type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1100
1101struct Signal<T> {
1102    tx: Mutex<Option<oneshot::Sender<T>>>,
1103    rx: Shared<Task<Option<T>>>,
1104}
1105
1106impl<T: Send + Clone + 'static> Signal<T> {
1107    pub fn new(cx: &App) -> Self {
1108        let (tx, rx) = oneshot::channel();
1109
1110        let task = cx
1111            .background_executor()
1112            .spawn(async move { rx.await.ok() })
1113            .shared();
1114
1115        Self {
1116            tx: Mutex::new(Some(tx)),
1117            rx: task,
1118        }
1119    }
1120
1121    fn set(&self, value: T) {
1122        if let Some(tx) = self.tx.lock().take() {
1123            let _ = tx.send(value);
1124        }
1125    }
1126
1127    fn wait(&self) -> Shared<Task<Option<T>>> {
1128        self.rx.clone()
1129    }
1130}
1131
1132struct ChannelClient {
1133    next_message_id: AtomicU32,
1134    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1135    buffer: Mutex<VecDeque<Envelope>>,
1136    response_channels: ResponseChannels,
1137    message_handlers: Mutex<ProtoMessageHandlerSet>,
1138    max_received: AtomicU32,
1139    name: &'static str,
1140    task: Mutex<Task<Result<()>>>,
1141    remote_started: Signal<()>,
1142}
1143
1144impl ChannelClient {
1145    fn new(
1146        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1147        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1148        cx: &App,
1149        name: &'static str,
1150    ) -> Arc<Self> {
1151        Arc::new_cyclic(|this| Self {
1152            outgoing_tx: Mutex::new(outgoing_tx),
1153            next_message_id: AtomicU32::new(0),
1154            max_received: AtomicU32::new(0),
1155            response_channels: ResponseChannels::default(),
1156            message_handlers: Default::default(),
1157            buffer: Mutex::new(VecDeque::new()),
1158            name,
1159            task: Mutex::new(Self::start_handling_messages(
1160                this.clone(),
1161                incoming_rx,
1162                &cx.to_async(),
1163            )),
1164            remote_started: Signal::new(cx),
1165        })
1166    }
1167
1168    fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1169        self.remote_started.wait()
1170    }
1171
1172    fn start_handling_messages(
1173        this: Weak<Self>,
1174        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1175        cx: &AsyncApp,
1176    ) -> Task<Result<()>> {
1177        cx.spawn(async move |cx| {
1178            if let Some(this) = this.upgrade() {
1179                let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1180                this.outgoing_tx.lock().unbounded_send(envelope).ok();
1181            };
1182
1183            let peer_id = PeerId { owner_id: 0, id: 0 };
1184            while let Some(incoming) = incoming_rx.next().await {
1185                let Some(this) = this.upgrade() else {
1186                    return anyhow::Ok(());
1187                };
1188                if let Some(ack_id) = incoming.ack_id {
1189                    let mut buffer = this.buffer.lock();
1190                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1191                        buffer.pop_front();
1192                    }
1193                }
1194                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1195                {
1196                    log::debug!(
1197                        "{}:ssh message received. name:FlushBufferedMessages",
1198                        this.name
1199                    );
1200                    {
1201                        let buffer = this.buffer.lock();
1202                        for envelope in buffer.iter() {
1203                            this.outgoing_tx
1204                                .lock()
1205                                .unbounded_send(envelope.clone())
1206                                .ok();
1207                        }
1208                    }
1209                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1210                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1211                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1212                    continue;
1213                }
1214
1215                if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1216                    this.remote_started.set(());
1217                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1218                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1219                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1220                    continue;
1221                }
1222
1223                this.max_received.store(incoming.id, SeqCst);
1224
1225                if let Some(request_id) = incoming.responding_to {
1226                    let request_id = MessageId(request_id);
1227                    let sender = this.response_channels.lock().remove(&request_id);
1228                    if let Some(sender) = sender {
1229                        let (tx, rx) = oneshot::channel();
1230                        if incoming.payload.is_some() {
1231                            sender.send((incoming, tx)).ok();
1232                        }
1233                        rx.await.ok();
1234                    }
1235                } else if let Some(envelope) =
1236                    build_typed_envelope(peer_id, Instant::now(), incoming)
1237                {
1238                    let type_name = envelope.payload_type_name();
1239                    let message_id = envelope.message_id();
1240                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1241                        &this.message_handlers,
1242                        envelope,
1243                        this.clone().into(),
1244                        cx.clone(),
1245                    ) {
1246                        log::debug!("{}:ssh message received. name:{type_name}", this.name);
1247                        cx.foreground_executor()
1248                            .spawn(async move {
1249                                match future.await {
1250                                    Ok(_) => {
1251                                        log::debug!(
1252                                            "{}:ssh message handled. name:{type_name}",
1253                                            this.name
1254                                        );
1255                                    }
1256                                    Err(error) => {
1257                                        log::error!(
1258                                            "{}:error handling message. type:{}, error:{:#}",
1259                                            this.name,
1260                                            type_name,
1261                                            format!("{error:#}").lines().fold(
1262                                                String::new(),
1263                                                |mut message, line| {
1264                                                    if !message.is_empty() {
1265                                                        message.push(' ');
1266                                                    }
1267                                                    message.push_str(line);
1268                                                    message
1269                                                }
1270                                            )
1271                                        );
1272                                    }
1273                                }
1274                            })
1275                            .detach()
1276                    } else {
1277                        log::error!("{}:unhandled ssh message name:{type_name}", this.name);
1278                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1279                            message_id,
1280                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1281                        ) {
1282                            log::error!(
1283                                "{}:error sending error response for {type_name}:{e:#}",
1284                                this.name
1285                            );
1286                        }
1287                    }
1288                }
1289            }
1290            anyhow::Ok(())
1291        })
1292    }
1293
1294    fn reconnect(
1295        self: &Arc<Self>,
1296        incoming_rx: UnboundedReceiver<Envelope>,
1297        outgoing_tx: UnboundedSender<Envelope>,
1298        cx: &AsyncApp,
1299    ) {
1300        *self.outgoing_tx.lock() = outgoing_tx;
1301        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1302    }
1303
1304    fn request<T: RequestMessage>(
1305        &self,
1306        payload: T,
1307    ) -> impl 'static + Future<Output = Result<T::Response>> {
1308        self.request_internal(payload, true)
1309    }
1310
1311    fn request_internal<T: RequestMessage>(
1312        &self,
1313        payload: T,
1314        use_buffer: bool,
1315    ) -> impl 'static + Future<Output = Result<T::Response>> {
1316        log::debug!("ssh request start. name:{}", T::NAME);
1317        let response =
1318            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1319        async move {
1320            let response = response.await?;
1321            log::debug!("ssh request finish. name:{}", T::NAME);
1322            T::Response::from_envelope(response).context("received a response of the wrong type")
1323        }
1324    }
1325
1326    async fn resync(&self, timeout: Duration) -> Result<()> {
1327        smol::future::or(
1328            async {
1329                self.request_internal(proto::FlushBufferedMessages {}, false)
1330                    .await?;
1331
1332                for envelope in self.buffer.lock().iter() {
1333                    self.outgoing_tx
1334                        .lock()
1335                        .unbounded_send(envelope.clone())
1336                        .ok();
1337                }
1338                Ok(())
1339            },
1340            async {
1341                smol::Timer::after(timeout).await;
1342                anyhow::bail!("Timed out resyncing remote client")
1343            },
1344        )
1345        .await
1346    }
1347
1348    async fn ping(&self, timeout: Duration) -> Result<()> {
1349        smol::future::or(
1350            async {
1351                self.request(proto::Ping {}).await?;
1352                Ok(())
1353            },
1354            async {
1355                smol::Timer::after(timeout).await;
1356                anyhow::bail!("Timed out pinging remote client")
1357            },
1358        )
1359        .await
1360    }
1361
1362    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1363        log::debug!("ssh send name:{}", T::NAME);
1364        self.send_dynamic(payload.into_envelope(0, None, None))
1365    }
1366
1367    fn request_dynamic(
1368        &self,
1369        mut envelope: proto::Envelope,
1370        type_name: &'static str,
1371        use_buffer: bool,
1372    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1373        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1374        let (tx, rx) = oneshot::channel();
1375        let mut response_channels_lock = self.response_channels.lock();
1376        response_channels_lock.insert(MessageId(envelope.id), tx);
1377        drop(response_channels_lock);
1378
1379        let result = if use_buffer {
1380            self.send_buffered(envelope)
1381        } else {
1382            self.send_unbuffered(envelope)
1383        };
1384        async move {
1385            if let Err(error) = &result {
1386                log::error!("failed to send message: {error}");
1387                anyhow::bail!("failed to send message: {error}");
1388            }
1389
1390            let response = rx.await.context("connection lost")?.0;
1391            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1392                return Err(RpcError::from_proto(error, type_name));
1393            }
1394            Ok(response)
1395        }
1396    }
1397
1398    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1399        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1400        self.send_buffered(envelope)
1401    }
1402
1403    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1404        envelope.ack_id = Some(self.max_received.load(SeqCst));
1405        self.buffer.lock().push_back(envelope.clone());
1406        // ignore errors on send (happen while we're reconnecting)
1407        // assume that the global "disconnected" overlay is sufficient.
1408        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1409        Ok(())
1410    }
1411
1412    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1413        envelope.ack_id = Some(self.max_received.load(SeqCst));
1414        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1415        Ok(())
1416    }
1417}
1418
1419impl ProtoClient for ChannelClient {
1420    fn request(
1421        &self,
1422        envelope: proto::Envelope,
1423        request_type: &'static str,
1424    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1425        self.request_dynamic(envelope, request_type, true).boxed()
1426    }
1427
1428    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1429        self.send_dynamic(envelope)
1430    }
1431
1432    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1433        self.send_dynamic(envelope)
1434    }
1435
1436    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1437        &self.message_handlers
1438    }
1439
1440    fn is_via_collab(&self) -> bool {
1441        false
1442    }
1443}
1444
1445#[cfg(any(test, feature = "test-support"))]
1446mod fake {
1447    use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1448    use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1449    use anyhow::Result;
1450    use askpass::EncryptedPassword;
1451    use async_trait::async_trait;
1452    use collections::HashMap;
1453    use futures::{
1454        FutureExt, SinkExt, StreamExt,
1455        channel::{
1456            mpsc::{self, Sender},
1457            oneshot,
1458        },
1459        select_biased,
1460    };
1461    use gpui::{App, AppContext as _, AsyncApp, SemanticVersion, Task, TestAppContext};
1462    use release_channel::ReleaseChannel;
1463    use rpc::proto::Envelope;
1464    use std::{path::PathBuf, sync::Arc};
1465    use util::paths::{PathStyle, RemotePathBuf};
1466
1467    pub(super) struct FakeRemoteConnection {
1468        pub(super) connection_options: RemoteConnectionOptions,
1469        pub(super) server_channel: Arc<ChannelClient>,
1470        pub(super) server_cx: SendableCx,
1471    }
1472
1473    pub(super) struct SendableCx(AsyncApp);
1474    impl SendableCx {
1475        // SAFETY: When run in test mode, GPUI is always single threaded.
1476        pub(super) fn new(cx: &TestAppContext) -> Self {
1477            Self(cx.to_async())
1478        }
1479
1480        // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1481        fn get(&self, _: &AsyncApp) -> AsyncApp {
1482            self.0.clone()
1483        }
1484    }
1485
1486    // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1487    unsafe impl Send for SendableCx {}
1488    unsafe impl Sync for SendableCx {}
1489
1490    #[async_trait(?Send)]
1491    impl RemoteConnection for FakeRemoteConnection {
1492        async fn kill(&self) -> Result<()> {
1493            Ok(())
1494        }
1495
1496        fn has_been_killed(&self) -> bool {
1497            false
1498        }
1499
1500        fn build_command(
1501            &self,
1502            program: Option<String>,
1503            args: &[String],
1504            env: &HashMap<String, String>,
1505            _: Option<String>,
1506            _: Option<(u16, String, u16)>,
1507        ) -> Result<CommandTemplate> {
1508            let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1509            let mut ssh_args = Vec::new();
1510            ssh_args.push(ssh_program);
1511            ssh_args.extend(args.iter().cloned());
1512            Ok(CommandTemplate {
1513                program: "ssh".into(),
1514                args: ssh_args,
1515                env: env.clone(),
1516            })
1517        }
1518
1519        fn upload_directory(
1520            &self,
1521            _src_path: PathBuf,
1522            _dest_path: RemotePathBuf,
1523            _cx: &App,
1524        ) -> Task<Result<()>> {
1525            unreachable!()
1526        }
1527
1528        fn connection_options(&self) -> RemoteConnectionOptions {
1529            self.connection_options.clone()
1530        }
1531
1532        fn simulate_disconnect(&self, cx: &AsyncApp) {
1533            let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1534            let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1535            self.server_channel
1536                .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1537        }
1538
1539        fn start_proxy(
1540            &self,
1541            _unique_identifier: String,
1542            _reconnect: bool,
1543            mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1544            mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1545            mut connection_activity_tx: Sender<()>,
1546            _delegate: Arc<dyn RemoteClientDelegate>,
1547            cx: &mut AsyncApp,
1548        ) -> Task<Result<i32>> {
1549            let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1550            let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1551
1552            self.server_channel.reconnect(
1553                server_incoming_rx,
1554                server_outgoing_tx,
1555                &self.server_cx.get(cx),
1556            );
1557
1558            cx.background_spawn(async move {
1559                loop {
1560                    select_biased! {
1561                        server_to_client = server_outgoing_rx.next().fuse() => {
1562                            let Some(server_to_client) = server_to_client else {
1563                                return Ok(1)
1564                            };
1565                            connection_activity_tx.try_send(()).ok();
1566                            client_incoming_tx.send(server_to_client).await.ok();
1567                        }
1568                        client_to_server = client_outgoing_rx.next().fuse() => {
1569                            let Some(client_to_server) = client_to_server else {
1570                                return Ok(1)
1571                            };
1572                            server_incoming_tx.send(client_to_server).await.ok();
1573                        }
1574                    }
1575                }
1576            })
1577        }
1578
1579        fn path_style(&self) -> PathStyle {
1580            PathStyle::local()
1581        }
1582
1583        fn shell(&self) -> String {
1584            "sh".to_owned()
1585        }
1586
1587        fn default_system_shell(&self) -> String {
1588            "sh".to_owned()
1589        }
1590    }
1591
1592    pub(super) struct Delegate;
1593
1594    impl RemoteClientDelegate for Delegate {
1595        fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1596            unreachable!()
1597        }
1598
1599        fn download_server_binary_locally(
1600            &self,
1601            _: RemotePlatform,
1602            _: ReleaseChannel,
1603            _: Option<SemanticVersion>,
1604            _: &mut AsyncApp,
1605        ) -> Task<Result<PathBuf>> {
1606            unreachable!()
1607        }
1608
1609        fn get_download_params(
1610            &self,
1611            _platform: RemotePlatform,
1612            _release_channel: ReleaseChannel,
1613            _version: Option<SemanticVersion>,
1614            _cx: &mut AsyncApp,
1615        ) -> Task<Result<Option<(String, String)>>> {
1616            unreachable!()
1617        }
1618
1619        fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1620    }
1621}