remote_client.rs

   1use crate::{
   2    SshConnectionOptions,
   3    protocol::MessageId,
   4    proxy::ProxyLaunchError,
   5    transport::{
   6        docker::{DockerConnectionOptions, DockerExecConnection},
   7        ssh::SshRemoteConnection,
   8        wsl::{WslConnectionOptions, WslRemoteConnection},
   9    },
  10};
  11use anyhow::{Context as _, Result, anyhow};
  12use askpass::EncryptedPassword;
  13use async_trait::async_trait;
  14use collections::HashMap;
  15use futures::{
  16    Future, FutureExt as _, StreamExt as _,
  17    channel::{
  18        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  19        oneshot,
  20    },
  21    future::{BoxFuture, Shared},
  22    select, select_biased,
  23};
  24use gpui::{
  25    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  26    EventEmitter, FutureExt, Global, Task, WeakEntity,
  27};
  28use parking_lot::Mutex;
  29
  30use release_channel::ReleaseChannel;
  31use rpc::{
  32    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  33    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  34};
  35use semver::Version;
  36use std::{
  37    collections::VecDeque,
  38    fmt,
  39    ops::ControlFlow,
  40    path::PathBuf,
  41    sync::{
  42        Arc, Weak,
  43        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  44    },
  45    time::{Duration, Instant},
  46};
  47use util::{
  48    ResultExt,
  49    paths::{PathStyle, RemotePathBuf},
  50};
  51
  52#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  53pub enum RemoteOs {
  54    Linux,
  55    MacOs,
  56    Windows,
  57}
  58
  59impl RemoteOs {
  60    pub fn as_str(&self) -> &'static str {
  61        match self {
  62            RemoteOs::Linux => "linux",
  63            RemoteOs::MacOs => "macos",
  64            RemoteOs::Windows => "windows",
  65        }
  66    }
  67
  68    pub fn is_windows(&self) -> bool {
  69        matches!(self, RemoteOs::Windows)
  70    }
  71}
  72
  73impl std::fmt::Display for RemoteOs {
  74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  75        f.write_str(self.as_str())
  76    }
  77}
  78
  79#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  80pub enum RemoteArch {
  81    X86_64,
  82    Aarch64,
  83}
  84
  85impl RemoteArch {
  86    pub fn as_str(&self) -> &'static str {
  87        match self {
  88            RemoteArch::X86_64 => "x86_64",
  89            RemoteArch::Aarch64 => "aarch64",
  90        }
  91    }
  92}
  93
  94impl std::fmt::Display for RemoteArch {
  95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  96        f.write_str(self.as_str())
  97    }
  98}
  99
 100#[derive(Copy, Clone, Debug)]
 101pub struct RemotePlatform {
 102    pub os: RemoteOs,
 103    pub arch: RemoteArch,
 104}
 105
 106#[derive(Clone, Debug)]
 107pub struct CommandTemplate {
 108    pub program: String,
 109    pub args: Vec<String>,
 110    pub env: HashMap<String, String>,
 111}
 112
 113pub trait RemoteClientDelegate: Send + Sync {
 114    fn ask_password(
 115        &self,
 116        prompt: String,
 117        tx: oneshot::Sender<EncryptedPassword>,
 118        cx: &mut AsyncApp,
 119    );
 120    fn get_download_url(
 121        &self,
 122        platform: RemotePlatform,
 123        release_channel: ReleaseChannel,
 124        version: Option<Version>,
 125        cx: &mut AsyncApp,
 126    ) -> Task<Result<Option<String>>>;
 127    fn download_server_binary_locally(
 128        &self,
 129        platform: RemotePlatform,
 130        release_channel: ReleaseChannel,
 131        version: Option<Version>,
 132        cx: &mut AsyncApp,
 133    ) -> Task<Result<PathBuf>>;
 134    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
 135}
 136
 137const MAX_MISSED_HEARTBEATS: usize = 5;
 138const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
 139const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
 140const INITIAL_CONNECTION_TIMEOUT: Duration =
 141    Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
 142
 143const MAX_RECONNECT_ATTEMPTS: usize = 3;
 144
 145enum State {
 146    Connecting,
 147    Connected {
 148        remote_connection: Arc<dyn RemoteConnection>,
 149        delegate: Arc<dyn RemoteClientDelegate>,
 150
 151        multiplex_task: Task<Result<()>>,
 152        heartbeat_task: Task<Result<()>>,
 153    },
 154    HeartbeatMissed {
 155        missed_heartbeats: usize,
 156
 157        remote_connection: Arc<dyn RemoteConnection>,
 158        delegate: Arc<dyn RemoteClientDelegate>,
 159
 160        multiplex_task: Task<Result<()>>,
 161        heartbeat_task: Task<Result<()>>,
 162    },
 163    Reconnecting,
 164    ReconnectFailed {
 165        remote_connection: Arc<dyn RemoteConnection>,
 166        delegate: Arc<dyn RemoteClientDelegate>,
 167
 168        error: anyhow::Error,
 169        attempts: usize,
 170    },
 171    ReconnectExhausted,
 172    ServerNotRunning,
 173}
 174
 175impl fmt::Display for State {
 176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 177        match self {
 178            Self::Connecting => write!(f, "connecting"),
 179            Self::Connected { .. } => write!(f, "connected"),
 180            Self::Reconnecting => write!(f, "reconnecting"),
 181            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 182            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 183            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 184            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 185        }
 186    }
 187}
 188
 189impl State {
 190    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 191        match self {
 192            Self::Connected {
 193                remote_connection, ..
 194            } => Some(remote_connection.clone()),
 195            Self::HeartbeatMissed {
 196                remote_connection, ..
 197            } => Some(remote_connection.clone()),
 198            Self::ReconnectFailed {
 199                remote_connection, ..
 200            } => Some(remote_connection.clone()),
 201            _ => None,
 202        }
 203    }
 204
 205    fn can_reconnect(&self) -> bool {
 206        match self {
 207            Self::Connected { .. }
 208            | Self::HeartbeatMissed { .. }
 209            | Self::ReconnectFailed { .. } => true,
 210            State::Connecting
 211            | State::Reconnecting
 212            | State::ReconnectExhausted
 213            | State::ServerNotRunning => false,
 214        }
 215    }
 216
 217    fn is_reconnect_failed(&self) -> bool {
 218        matches!(self, Self::ReconnectFailed { .. })
 219    }
 220
 221    fn is_reconnect_exhausted(&self) -> bool {
 222        matches!(self, Self::ReconnectExhausted { .. })
 223    }
 224
 225    fn is_server_not_running(&self) -> bool {
 226        matches!(self, Self::ServerNotRunning)
 227    }
 228
 229    fn is_reconnecting(&self) -> bool {
 230        matches!(self, Self::Reconnecting { .. })
 231    }
 232
 233    fn heartbeat_recovered(self) -> Self {
 234        match self {
 235            Self::HeartbeatMissed {
 236                remote_connection,
 237                delegate,
 238                multiplex_task,
 239                heartbeat_task,
 240                ..
 241            } => Self::Connected {
 242                remote_connection: remote_connection,
 243                delegate,
 244                multiplex_task,
 245                heartbeat_task,
 246            },
 247            _ => self,
 248        }
 249    }
 250
 251    fn heartbeat_missed(self) -> Self {
 252        match self {
 253            Self::Connected {
 254                remote_connection,
 255                delegate,
 256                multiplex_task,
 257                heartbeat_task,
 258            } => Self::HeartbeatMissed {
 259                missed_heartbeats: 1,
 260                remote_connection,
 261                delegate,
 262                multiplex_task,
 263                heartbeat_task,
 264            },
 265            Self::HeartbeatMissed {
 266                missed_heartbeats,
 267                remote_connection,
 268                delegate,
 269                multiplex_task,
 270                heartbeat_task,
 271            } => Self::HeartbeatMissed {
 272                missed_heartbeats: missed_heartbeats + 1,
 273                remote_connection,
 274                delegate,
 275                multiplex_task,
 276                heartbeat_task,
 277            },
 278            _ => self,
 279        }
 280    }
 281}
 282
 283/// The state of the ssh connection.
 284#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 285pub enum ConnectionState {
 286    Connecting,
 287    Connected,
 288    HeartbeatMissed,
 289    Reconnecting,
 290    Disconnected,
 291}
 292
 293impl From<&State> for ConnectionState {
 294    fn from(value: &State) -> Self {
 295        match value {
 296            State::Connecting => Self::Connecting,
 297            State::Connected { .. } => Self::Connected,
 298            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 299            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 300            State::ReconnectExhausted => Self::Disconnected,
 301            State::ServerNotRunning => Self::Disconnected,
 302        }
 303    }
 304}
 305
 306pub struct RemoteClient {
 307    client: Arc<ChannelClient>,
 308    unique_identifier: String,
 309    connection_options: RemoteConnectionOptions,
 310    path_style: PathStyle,
 311    state: Option<State>,
 312}
 313
 314#[derive(Debug)]
 315pub enum RemoteClientEvent {
 316    Disconnected,
 317}
 318
 319impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 320
 321/// Identifies the socket on the remote server so that reconnects
 322/// can re-join the same project.
 323pub enum ConnectionIdentifier {
 324    Setup(u64),
 325    Workspace(i64),
 326}
 327
 328static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 329
 330impl ConnectionIdentifier {
 331    pub fn setup() -> Self {
 332        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 333    }
 334
 335    // This string gets used in a socket name, and so must be relatively short.
 336    // The total length of:
 337    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 338    // Must be less than about 100 characters
 339    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 340    // So our strings should be at most 20 characters or so.
 341    fn to_string(&self, cx: &App) -> String {
 342        let identifier_prefix = match ReleaseChannel::global(cx) {
 343            ReleaseChannel::Stable => "".to_string(),
 344            release_channel => format!("{}-", release_channel.dev_name()),
 345        };
 346        match self {
 347            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 348            Self::Workspace(workspace_id) => {
 349                format!("{identifier_prefix}workspace-{workspace_id}",)
 350            }
 351        }
 352    }
 353}
 354
 355pub async fn connect(
 356    connection_options: RemoteConnectionOptions,
 357    delegate: Arc<dyn RemoteClientDelegate>,
 358    cx: &mut AsyncApp,
 359) -> Result<Arc<dyn RemoteConnection>> {
 360    cx.update(|cx| {
 361        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 362            pool.connect(connection_options.clone(), delegate.clone(), cx)
 363        })
 364    })
 365    .await
 366    .map_err(|e| e.cloned())
 367}
 368
 369impl RemoteClient {
 370    pub fn new(
 371        unique_identifier: ConnectionIdentifier,
 372        remote_connection: Arc<dyn RemoteConnection>,
 373        cancellation: oneshot::Receiver<()>,
 374        delegate: Arc<dyn RemoteClientDelegate>,
 375        cx: &mut App,
 376    ) -> Task<Result<Option<Entity<Self>>>> {
 377        let unique_identifier = unique_identifier.to_string(cx);
 378        cx.spawn(async move |cx| {
 379            let success = Box::pin(async move {
 380                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 381                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 382                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 383
 384                let client = cx.update(|cx| {
 385                    ChannelClient::new(
 386                        incoming_rx,
 387                        outgoing_tx,
 388                        cx,
 389                        "client",
 390                        remote_connection.has_wsl_interop(),
 391                    )
 392                });
 393
 394                let path_style = remote_connection.path_style();
 395                let this = cx.new(|_| Self {
 396                    client: client.clone(),
 397                    unique_identifier: unique_identifier.clone(),
 398                    connection_options: remote_connection.connection_options(),
 399                    path_style,
 400                    state: Some(State::Connecting),
 401                });
 402
 403                let io_task = remote_connection.start_proxy(
 404                    unique_identifier,
 405                    false,
 406                    incoming_tx,
 407                    outgoing_rx,
 408                    connection_activity_tx,
 409                    delegate.clone(),
 410                    cx,
 411                );
 412
 413                let ready = client
 414                    .wait_for_remote_started()
 415                    .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
 416                    .await;
 417                match ready {
 418                    Ok(Some(_)) => {}
 419                    Ok(None) => {
 420                        let mut error = "remote client exited before becoming ready".to_owned();
 421                        if let Some(status) = io_task.now_or_never() {
 422                            match status {
 423                                Ok(exit_code) => {
 424                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 425                                }
 426                                Err(e) => error.push_str(&format!(", error={e:?}")),
 427                            }
 428                        }
 429                        let error = anyhow::anyhow!("{error}");
 430                        log::error!("failed to establish connection: {}", error);
 431                        return Err(error);
 432                    }
 433                    Err(_) => {
 434                        let mut error =
 435                            "remote client did not become ready within the timeout".to_owned();
 436                        if let Some(status) = io_task.now_or_never() {
 437                            match status {
 438                                Ok(exit_code) => {
 439                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 440                                }
 441                                Err(e) => error.push_str(&format!(", error={e:?}")),
 442                            }
 443                        }
 444                        let error = anyhow::anyhow!("{error}");
 445                        log::error!("failed to establish connection: {}", error);
 446                        return Err(error);
 447                    }
 448                }
 449                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 450                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 451                    log::error!("failed to establish connection: {}", error);
 452                    return Err(error);
 453                }
 454
 455                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 456
 457                this.update(cx, |this, _| {
 458                    this.state = Some(State::Connected {
 459                        remote_connection,
 460                        delegate,
 461                        multiplex_task,
 462                        heartbeat_task,
 463                    });
 464                });
 465
 466                Ok(Some(this))
 467            });
 468
 469            select! {
 470                _ = cancellation.fuse() => {
 471                    Ok(None)
 472                }
 473                result = success.fuse() =>  result
 474            }
 475        })
 476    }
 477
 478    pub fn proto_client_from_channels(
 479        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 480        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 481        cx: &App,
 482        name: &'static str,
 483        has_wsl_interop: bool,
 484    ) -> AnyProtoClient {
 485        ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
 486    }
 487
 488    pub fn shutdown_processes<T: RequestMessage>(
 489        &mut self,
 490        shutdown_request: Option<T>,
 491        executor: BackgroundExecutor,
 492    ) -> Option<impl Future<Output = ()> + use<T>> {
 493        let state = self.state.take()?;
 494        log::info!("shutting down ssh processes");
 495
 496        let State::Connected {
 497            multiplex_task,
 498            heartbeat_task,
 499            remote_connection,
 500            delegate,
 501        } = state
 502        else {
 503            return None;
 504        };
 505
 506        let client = self.client.clone();
 507
 508        Some(async move {
 509            if let Some(shutdown_request) = shutdown_request {
 510                client.send(shutdown_request).log_err();
 511                // We wait 50ms instead of waiting for a response, because
 512                // waiting for a response would require us to wait on the main thread
 513                // which we want to avoid in an `on_app_quit` callback.
 514                executor.timer(Duration::from_millis(50)).await;
 515            }
 516
 517            // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
 518            // child of master_process.
 519            drop(multiplex_task);
 520            // Now drop the rest of state, which kills master process.
 521            drop(heartbeat_task);
 522            drop(remote_connection);
 523            drop(delegate);
 524        })
 525    }
 526
 527    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 528        let can_reconnect = self
 529            .state
 530            .as_ref()
 531            .map(|state| state.can_reconnect())
 532            .unwrap_or(false);
 533        if !can_reconnect {
 534            log::info!("aborting reconnect, because not in state that allows reconnecting");
 535            let error = if let Some(state) = self.state.as_ref() {
 536                format!("invalid state, cannot reconnect while in state {state}")
 537            } else {
 538                "no state set".to_string()
 539            };
 540            anyhow::bail!(error);
 541        }
 542
 543        let state = self.state.take().unwrap();
 544        let (attempts, remote_connection, delegate) = match state {
 545            State::Connected {
 546                remote_connection,
 547                delegate,
 548                multiplex_task,
 549                heartbeat_task,
 550            }
 551            | State::HeartbeatMissed {
 552                remote_connection,
 553                delegate,
 554                multiplex_task,
 555                heartbeat_task,
 556                ..
 557            } => {
 558                drop(multiplex_task);
 559                drop(heartbeat_task);
 560                (0, remote_connection, delegate)
 561            }
 562            State::ReconnectFailed {
 563                attempts,
 564                remote_connection,
 565                delegate,
 566                ..
 567            } => (attempts, remote_connection, delegate),
 568            State::Connecting
 569            | State::Reconnecting
 570            | State::ReconnectExhausted
 571            | State::ServerNotRunning => unreachable!(),
 572        };
 573
 574        let attempts = attempts + 1;
 575        if attempts > MAX_RECONNECT_ATTEMPTS {
 576            log::error!(
 577                "Failed to reconnect to after {} attempts, giving up",
 578                MAX_RECONNECT_ATTEMPTS
 579            );
 580            self.set_state(State::ReconnectExhausted, cx);
 581            return Ok(());
 582        }
 583
 584        self.set_state(State::Reconnecting, cx);
 585
 586        log::info!(
 587            "Trying to reconnect to remote server... Attempt {}",
 588            attempts
 589        );
 590
 591        let unique_identifier = self.unique_identifier.clone();
 592        let client = self.client.clone();
 593        let reconnect_task = cx.spawn(async move |this, cx| {
 594            macro_rules! failed {
 595                ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
 596                    delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
 597                    return State::ReconnectFailed {
 598                        error: anyhow!($error),
 599                        attempts: $attempts,
 600                        remote_connection: $remote_connection,
 601                        delegate: $delegate,
 602                    };
 603                };
 604            }
 605
 606            if let Err(error) = remote_connection
 607                .kill()
 608                .await
 609                .context("Failed to kill remote_connection process")
 610            {
 611                failed!(error, attempts, remote_connection, delegate);
 612            };
 613
 614            let connection_options = remote_connection.connection_options();
 615
 616            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 617            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 618            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 619
 620            let (remote_connection, io_task) = match async {
 621                let remote_connection = cx
 622                    .update_global(|pool: &mut ConnectionPool, cx| {
 623                        pool.connect(connection_options, delegate.clone(), cx)
 624                    })
 625                    .await
 626                    .map_err(|error| error.cloned())?;
 627
 628                let io_task = remote_connection.start_proxy(
 629                    unique_identifier,
 630                    true,
 631                    incoming_tx,
 632                    outgoing_rx,
 633                    connection_activity_tx,
 634                    delegate.clone(),
 635                    cx,
 636                );
 637                anyhow::Ok((remote_connection, io_task))
 638            }
 639            .await
 640            {
 641                Ok((remote_connection, io_task)) => (remote_connection, io_task),
 642                Err(error) => {
 643                    failed!(error, attempts, remote_connection, delegate);
 644                }
 645            };
 646
 647            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 648            client.reconnect(incoming_rx, outgoing_tx, cx);
 649
 650            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 651                failed!(error, attempts, remote_connection, delegate);
 652            };
 653
 654            State::Connected {
 655                remote_connection: remote_connection,
 656                delegate,
 657                multiplex_task,
 658                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 659            }
 660        });
 661
 662        cx.spawn(async move |this, cx| {
 663            let new_state = reconnect_task.await;
 664            this.update(cx, |this, cx| {
 665                this.try_set_state(cx, |old_state| {
 666                    if old_state.is_reconnecting() {
 667                        match &new_state {
 668                            State::Connecting
 669                            | State::Reconnecting
 670                            | State::HeartbeatMissed { .. }
 671                            | State::ServerNotRunning => {}
 672                            State::Connected { .. } => {
 673                                log::info!("Successfully reconnected");
 674                            }
 675                            State::ReconnectFailed {
 676                                error, attempts, ..
 677                            } => {
 678                                log::error!(
 679                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 680                                    attempts,
 681                                    error
 682                                );
 683                            }
 684                            State::ReconnectExhausted => {
 685                                log::error!("Reconnect attempt failed and all attempts exhausted");
 686                            }
 687                        }
 688                        Some(new_state)
 689                    } else {
 690                        None
 691                    }
 692                });
 693
 694                if this.state_is(State::is_reconnect_failed) {
 695                    this.reconnect(cx)
 696                } else if this.state_is(State::is_reconnect_exhausted) {
 697                    Ok(())
 698                } else {
 699                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 700                    Ok(())
 701                }
 702            })
 703        })
 704        .detach_and_log_err(cx);
 705
 706        Ok(())
 707    }
 708
 709    fn heartbeat(
 710        this: WeakEntity<Self>,
 711        mut connection_activity_rx: mpsc::Receiver<()>,
 712        cx: &mut AsyncApp,
 713    ) -> Task<Result<()>> {
 714        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 715            return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
 716        };
 717
 718        cx.spawn(async move |cx| {
 719            let mut missed_heartbeats = 0;
 720
 721            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 722            futures::pin_mut!(keepalive_timer);
 723
 724            loop {
 725                select_biased! {
 726                    result = connection_activity_rx.next().fuse() => {
 727                        if result.is_none() {
 728                            log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
 729                            return Ok(());
 730                        }
 731
 732                        if missed_heartbeats != 0 {
 733                            missed_heartbeats = 0;
 734                            let _ =this.update(cx, |this, cx| {
 735                                this.handle_heartbeat_result(missed_heartbeats, cx)
 736                            })?;
 737                        }
 738                    }
 739                    _ = keepalive_timer => {
 740                        log::debug!("Sending heartbeat to server...");
 741
 742                        let result = select_biased! {
 743                            _ = connection_activity_rx.next().fuse() => {
 744                                Ok(())
 745                            }
 746                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 747                                ping_result
 748                            }
 749                        };
 750
 751                        if result.is_err() {
 752                            missed_heartbeats += 1;
 753                            log::warn!(
 754                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 755                                HEARTBEAT_TIMEOUT,
 756                                missed_heartbeats,
 757                                MAX_MISSED_HEARTBEATS
 758                            );
 759                        } else if missed_heartbeats != 0 {
 760                            missed_heartbeats = 0;
 761                        } else {
 762                            continue;
 763                        }
 764
 765                        let result = this.update(cx, |this, cx| {
 766                            this.handle_heartbeat_result(missed_heartbeats, cx)
 767                        })?;
 768                        if result.is_break() {
 769                            return Ok(());
 770                        }
 771                    }
 772                }
 773
 774                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 775            }
 776        })
 777    }
 778
 779    fn handle_heartbeat_result(
 780        &mut self,
 781        missed_heartbeats: usize,
 782        cx: &mut Context<Self>,
 783    ) -> ControlFlow<()> {
 784        let state = self.state.take().unwrap();
 785        let next_state = if missed_heartbeats > 0 {
 786            state.heartbeat_missed()
 787        } else {
 788            state.heartbeat_recovered()
 789        };
 790
 791        self.set_state(next_state, cx);
 792
 793        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 794            log::error!(
 795                "Missed last {} heartbeats. Reconnecting...",
 796                missed_heartbeats
 797            );
 798
 799            self.reconnect(cx)
 800                .context("failed to start reconnect process after missing heartbeats")
 801                .log_err();
 802            ControlFlow::Break(())
 803        } else {
 804            ControlFlow::Continue(())
 805        }
 806    }
 807
 808    fn monitor(
 809        this: WeakEntity<Self>,
 810        io_task: Task<Result<i32>>,
 811        cx: &AsyncApp,
 812    ) -> Task<Result<()>> {
 813        cx.spawn(async move |cx| {
 814            let result = io_task.await;
 815
 816            match result {
 817                Ok(exit_code) => {
 818                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 819                        match error {
 820                            ProxyLaunchError::ServerNotRunning => {
 821                                log::error!("failed to reconnect because server is not running");
 822                                this.update(cx, |this, cx| {
 823                                    this.set_state(State::ServerNotRunning, cx);
 824                                })?;
 825                            }
 826                        }
 827                    } else if exit_code > 0 {
 828                        log::error!("proxy process terminated unexpectedly");
 829                        this.update(cx, |this, cx| {
 830                            this.reconnect(cx).ok();
 831                        })?;
 832                    }
 833                }
 834                Err(error) => {
 835                    log::warn!(
 836                        "remote io task died with error: {:?}. reconnecting...",
 837                        error
 838                    );
 839                    this.update(cx, |this, cx| {
 840                        this.reconnect(cx).ok();
 841                    })?;
 842                }
 843            }
 844
 845            Ok(())
 846        })
 847    }
 848
 849    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 850        self.state.as_ref().is_some_and(check)
 851    }
 852
 853    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 854        let new_state = self.state.as_ref().and_then(map);
 855        if let Some(new_state) = new_state {
 856            self.state.replace(new_state);
 857            cx.notify();
 858        }
 859    }
 860
 861    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 862        log::info!("setting state to '{}'", &state);
 863
 864        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 865        let is_server_not_running = state.is_server_not_running();
 866        self.state.replace(state);
 867
 868        if is_reconnect_exhausted || is_server_not_running {
 869            cx.emit(RemoteClientEvent::Disconnected);
 870        }
 871        cx.notify();
 872    }
 873
 874    pub fn shell(&self) -> Option<String> {
 875        Some(self.remote_connection()?.shell())
 876    }
 877
 878    pub fn default_system_shell(&self) -> Option<String> {
 879        Some(self.remote_connection()?.default_system_shell())
 880    }
 881
 882    pub fn shares_network_interface(&self) -> bool {
 883        self.remote_connection()
 884            .map_or(false, |connection| connection.shares_network_interface())
 885    }
 886
 887    pub fn build_command(
 888        &self,
 889        program: Option<String>,
 890        args: &[String],
 891        env: &HashMap<String, String>,
 892        working_dir: Option<String>,
 893        port_forward: Option<(u16, String, u16)>,
 894    ) -> Result<CommandTemplate> {
 895        let Some(connection) = self.remote_connection() else {
 896            return Err(anyhow!("no remote connection"));
 897        };
 898        connection.build_command(program, args, env, working_dir, port_forward)
 899    }
 900
 901    pub fn build_forward_ports_command(
 902        &self,
 903        forwards: Vec<(u16, String, u16)>,
 904    ) -> Result<CommandTemplate> {
 905        let Some(connection) = self.remote_connection() else {
 906            return Err(anyhow!("no remote connection"));
 907        };
 908        connection.build_forward_ports_command(forwards)
 909    }
 910
 911    pub fn upload_directory(
 912        &self,
 913        src_path: PathBuf,
 914        dest_path: RemotePathBuf,
 915        cx: &App,
 916    ) -> Task<Result<()>> {
 917        let Some(connection) = self.remote_connection() else {
 918            return Task::ready(Err(anyhow!("no remote connection")));
 919        };
 920        connection.upload_directory(src_path, dest_path, cx)
 921    }
 922
 923    pub fn proto_client(&self) -> AnyProtoClient {
 924        self.client.clone().into()
 925    }
 926
 927    pub fn connection_options(&self) -> RemoteConnectionOptions {
 928        self.connection_options.clone()
 929    }
 930
 931    pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 932        if let State::Connected {
 933            remote_connection, ..
 934        } = self.state.as_ref()?
 935        {
 936            Some(remote_connection.clone())
 937        } else {
 938            None
 939        }
 940    }
 941
 942    pub fn connection_state(&self) -> ConnectionState {
 943        self.state
 944            .as_ref()
 945            .map(ConnectionState::from)
 946            .unwrap_or(ConnectionState::Disconnected)
 947    }
 948
 949    pub fn is_disconnected(&self) -> bool {
 950        self.connection_state() == ConnectionState::Disconnected
 951    }
 952
 953    pub fn path_style(&self) -> PathStyle {
 954        self.path_style
 955    }
 956
 957    #[cfg(any(test, feature = "test-support"))]
 958    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
 959        let opts = self.connection_options();
 960        client_cx.spawn(async move |cx| {
 961            let connection = cx
 962                .update_global(|c: &mut ConnectionPool, _| {
 963                    if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
 964                        c.clone()
 965                    } else {
 966                        panic!("missing test connection")
 967                    }
 968                })
 969                .await
 970                .unwrap();
 971
 972            connection.simulate_disconnect(cx);
 973        })
 974    }
 975
 976    #[cfg(any(test, feature = "test-support"))]
 977    pub fn fake_server(
 978        client_cx: &mut gpui::TestAppContext,
 979        server_cx: &mut gpui::TestAppContext,
 980    ) -> (RemoteConnectionOptions, AnyProtoClient) {
 981        use crate::transport::ssh::SshConnectionHost;
 982
 983        let port = client_cx
 984            .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
 985        let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
 986            host: SshConnectionHost::from("<fake>".to_string()),
 987            port: Some(port),
 988            ..Default::default()
 989        });
 990        let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
 991        let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
 992        let server_client = server_cx
 993            .update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server", false));
 994        let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
 995            connection_options: opts.clone(),
 996            server_cx: fake::SendableCx::new(server_cx),
 997            server_channel: server_client.clone(),
 998        });
 999
1000        client_cx.update(|cx| {
1001            cx.update_default_global(|c: &mut ConnectionPool, cx| {
1002                c.connections.insert(
1003                    opts.clone(),
1004                    ConnectionPoolEntry::Connecting(
1005                        cx.background_spawn({
1006                            let connection = connection.clone();
1007                            async move { Ok(connection.clone()) }
1008                        })
1009                        .shared(),
1010                    ),
1011                );
1012            })
1013        });
1014
1015        (opts, server_client.into())
1016    }
1017
1018    #[cfg(any(test, feature = "test-support"))]
1019    pub async fn fake_client(
1020        opts: RemoteConnectionOptions,
1021        client_cx: &mut gpui::TestAppContext,
1022    ) -> Entity<Self> {
1023        let (_tx, rx) = oneshot::channel();
1024        let mut cx = client_cx.to_async();
1025        let connection = connect(opts, Arc::new(fake::Delegate), &mut cx)
1026            .await
1027            .unwrap();
1028        client_cx
1029            .update(|cx| {
1030                Self::new(
1031                    ConnectionIdentifier::setup(),
1032                    connection,
1033                    rx,
1034                    Arc::new(fake::Delegate),
1035                    cx,
1036                )
1037            })
1038            .await
1039            .unwrap()
1040            .unwrap()
1041    }
1042
1043    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1044        self.state
1045            .as_ref()
1046            .and_then(|state| state.remote_connection())
1047    }
1048}
1049
1050enum ConnectionPoolEntry {
1051    Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1052    Connected(Weak<dyn RemoteConnection>),
1053}
1054
1055#[derive(Default)]
1056struct ConnectionPool {
1057    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1058}
1059
1060impl Global for ConnectionPool {}
1061
1062impl ConnectionPool {
1063    pub fn connect(
1064        &mut self,
1065        opts: RemoteConnectionOptions,
1066        delegate: Arc<dyn RemoteClientDelegate>,
1067        cx: &mut App,
1068    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1069        let connection = self.connections.get(&opts);
1070        match connection {
1071            Some(ConnectionPoolEntry::Connecting(task)) => {
1072                delegate.set_status(
1073                    Some("Waiting for existing connection attempt"),
1074                    &mut cx.to_async(),
1075                );
1076                return task.clone();
1077            }
1078            Some(ConnectionPoolEntry::Connected(remote)) => {
1079                if let Some(remote) = remote.upgrade()
1080                    && !remote.has_been_killed()
1081                {
1082                    return Task::ready(Ok(remote)).shared();
1083                }
1084                self.connections.remove(&opts);
1085            }
1086            None => {}
1087        }
1088
1089        let task = cx
1090            .spawn({
1091                let opts = opts.clone();
1092                let delegate = delegate.clone();
1093                async move |cx| {
1094                    let connection = match opts.clone() {
1095                        RemoteConnectionOptions::Ssh(opts) => {
1096                            SshRemoteConnection::new(opts, delegate, cx)
1097                                .await
1098                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1099                        }
1100                        RemoteConnectionOptions::Wsl(opts) => {
1101                            WslRemoteConnection::new(opts, delegate, cx)
1102                                .await
1103                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1104                        }
1105                        RemoteConnectionOptions::Docker(opts) => {
1106                            DockerExecConnection::new(opts, delegate, cx)
1107                                .await
1108                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1109                        }
1110                    };
1111
1112                    cx.update_global(|pool: &mut Self, _| {
1113                        debug_assert!(matches!(
1114                            pool.connections.get(&opts),
1115                            Some(ConnectionPoolEntry::Connecting(_))
1116                        ));
1117                        match connection {
1118                            Ok(connection) => {
1119                                pool.connections.insert(
1120                                    opts.clone(),
1121                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1122                                );
1123                                Ok(connection)
1124                            }
1125                            Err(error) => {
1126                                pool.connections.remove(&opts);
1127                                Err(Arc::new(error))
1128                            }
1129                        }
1130                    })
1131                }
1132            })
1133            .shared();
1134
1135        self.connections
1136            .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1137        task
1138    }
1139}
1140
1141#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1142pub enum RemoteConnectionOptions {
1143    Ssh(SshConnectionOptions),
1144    Wsl(WslConnectionOptions),
1145    Docker(DockerConnectionOptions),
1146}
1147
1148impl RemoteConnectionOptions {
1149    pub fn display_name(&self) -> String {
1150        match self {
1151            RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1152            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1153            RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1154        }
1155    }
1156}
1157
1158impl From<SshConnectionOptions> for RemoteConnectionOptions {
1159    fn from(opts: SshConnectionOptions) -> Self {
1160        RemoteConnectionOptions::Ssh(opts)
1161    }
1162}
1163
1164impl From<WslConnectionOptions> for RemoteConnectionOptions {
1165    fn from(opts: WslConnectionOptions) -> Self {
1166        RemoteConnectionOptions::Wsl(opts)
1167    }
1168}
1169
1170#[cfg(target_os = "windows")]
1171/// Open a wsl path (\\wsl.localhost\<distro>\path)
1172#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1173#[action(namespace = workspace, no_json, no_register)]
1174pub struct OpenWslPath {
1175    pub distro: WslConnectionOptions,
1176    pub paths: Vec<PathBuf>,
1177}
1178
1179#[async_trait(?Send)]
1180pub trait RemoteConnection: Send + Sync {
1181    fn start_proxy(
1182        &self,
1183        unique_identifier: String,
1184        reconnect: bool,
1185        incoming_tx: UnboundedSender<Envelope>,
1186        outgoing_rx: UnboundedReceiver<Envelope>,
1187        connection_activity_tx: Sender<()>,
1188        delegate: Arc<dyn RemoteClientDelegate>,
1189        cx: &mut AsyncApp,
1190    ) -> Task<Result<i32>>;
1191    fn upload_directory(
1192        &self,
1193        src_path: PathBuf,
1194        dest_path: RemotePathBuf,
1195        cx: &App,
1196    ) -> Task<Result<()>>;
1197    async fn kill(&self) -> Result<()>;
1198    fn has_been_killed(&self) -> bool;
1199    fn shares_network_interface(&self) -> bool {
1200        false
1201    }
1202    fn build_command(
1203        &self,
1204        program: Option<String>,
1205        args: &[String],
1206        env: &HashMap<String, String>,
1207        working_dir: Option<String>,
1208        port_forward: Option<(u16, String, u16)>,
1209    ) -> Result<CommandTemplate>;
1210    fn build_forward_ports_command(
1211        &self,
1212        forwards: Vec<(u16, String, u16)>,
1213    ) -> Result<CommandTemplate>;
1214    fn connection_options(&self) -> RemoteConnectionOptions;
1215    fn path_style(&self) -> PathStyle;
1216    fn shell(&self) -> String;
1217    fn default_system_shell(&self) -> String;
1218    fn has_wsl_interop(&self) -> bool;
1219
1220    #[cfg(any(test, feature = "test-support"))]
1221    fn simulate_disconnect(&self, _: &AsyncApp) {}
1222}
1223
1224type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1225
1226struct Signal<T> {
1227    tx: Mutex<Option<oneshot::Sender<T>>>,
1228    rx: Shared<Task<Option<T>>>,
1229}
1230
1231impl<T: Send + Clone + 'static> Signal<T> {
1232    pub fn new(cx: &App) -> Self {
1233        let (tx, rx) = oneshot::channel();
1234
1235        let task = cx
1236            .background_executor()
1237            .spawn(async move { rx.await.ok() })
1238            .shared();
1239
1240        Self {
1241            tx: Mutex::new(Some(tx)),
1242            rx: task,
1243        }
1244    }
1245
1246    fn set(&self, value: T) {
1247        if let Some(tx) = self.tx.lock().take() {
1248            let _ = tx.send(value);
1249        }
1250    }
1251
1252    fn wait(&self) -> Shared<Task<Option<T>>> {
1253        self.rx.clone()
1254    }
1255}
1256
1257struct ChannelClient {
1258    next_message_id: AtomicU32,
1259    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1260    buffer: Mutex<VecDeque<Envelope>>,
1261    response_channels: ResponseChannels,
1262    message_handlers: Mutex<ProtoMessageHandlerSet>,
1263    max_received: AtomicU32,
1264    name: &'static str,
1265    task: Mutex<Task<Result<()>>>,
1266    remote_started: Signal<()>,
1267    has_wsl_interop: bool,
1268}
1269
1270impl ChannelClient {
1271    fn new(
1272        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1273        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1274        cx: &App,
1275        name: &'static str,
1276        has_wsl_interop: bool,
1277    ) -> Arc<Self> {
1278        Arc::new_cyclic(|this| Self {
1279            outgoing_tx: Mutex::new(outgoing_tx),
1280            next_message_id: AtomicU32::new(0),
1281            max_received: AtomicU32::new(0),
1282            response_channels: ResponseChannels::default(),
1283            message_handlers: Default::default(),
1284            buffer: Mutex::new(VecDeque::new()),
1285            name,
1286            task: Mutex::new(Self::start_handling_messages(
1287                this.clone(),
1288                incoming_rx,
1289                &cx.to_async(),
1290            )),
1291            remote_started: Signal::new(cx),
1292            has_wsl_interop,
1293        })
1294    }
1295
1296    fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1297        self.remote_started.wait()
1298    }
1299
1300    fn start_handling_messages(
1301        this: Weak<Self>,
1302        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1303        cx: &AsyncApp,
1304    ) -> Task<Result<()>> {
1305        cx.spawn(async move |cx| {
1306            if let Some(this) = this.upgrade() {
1307                let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1308                this.outgoing_tx.lock().unbounded_send(envelope).ok();
1309            };
1310
1311            let peer_id = PeerId { owner_id: 0, id: 0 };
1312            while let Some(incoming) = incoming_rx.next().await {
1313                let Some(this) = this.upgrade() else {
1314                    return anyhow::Ok(());
1315                };
1316                if let Some(ack_id) = incoming.ack_id {
1317                    let mut buffer = this.buffer.lock();
1318                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1319                        buffer.pop_front();
1320                    }
1321                }
1322                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1323                {
1324                    log::debug!(
1325                        "{}:remote message received. name:FlushBufferedMessages",
1326                        this.name
1327                    );
1328                    {
1329                        let buffer = this.buffer.lock();
1330                        for envelope in buffer.iter() {
1331                            this.outgoing_tx
1332                                .lock()
1333                                .unbounded_send(envelope.clone())
1334                                .ok();
1335                        }
1336                    }
1337                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1338                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1339                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1340                    continue;
1341                }
1342
1343                if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1344                    this.remote_started.set(());
1345                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1346                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1347                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1348                    continue;
1349                }
1350
1351                this.max_received.store(incoming.id, SeqCst);
1352
1353                if let Some(request_id) = incoming.responding_to {
1354                    let request_id = MessageId(request_id);
1355                    let sender = this.response_channels.lock().remove(&request_id);
1356                    if let Some(sender) = sender {
1357                        let (tx, rx) = oneshot::channel();
1358                        if incoming.payload.is_some() {
1359                            sender.send((incoming, tx)).ok();
1360                        }
1361                        rx.await.ok();
1362                    }
1363                } else if let Some(envelope) =
1364                    build_typed_envelope(peer_id, Instant::now(), incoming)
1365                {
1366                    let type_name = envelope.payload_type_name();
1367                    let message_id = envelope.message_id();
1368                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1369                        &this.message_handlers,
1370                        envelope,
1371                        this.clone().into(),
1372                        cx.clone(),
1373                    ) {
1374                        log::debug!("{}:remote message received. name:{type_name}", this.name);
1375                        cx.foreground_executor()
1376                            .spawn(async move {
1377                                match future.await {
1378                                    Ok(_) => {
1379                                        log::debug!(
1380                                            "{}:remote message handled. name:{type_name}",
1381                                            this.name
1382                                        );
1383                                    }
1384                                    Err(error) => {
1385                                        log::error!(
1386                                            "{}:error handling message. type:{}, error:{:#}",
1387                                            this.name,
1388                                            type_name,
1389                                            format!("{error:#}").lines().fold(
1390                                                String::new(),
1391                                                |mut message, line| {
1392                                                    if !message.is_empty() {
1393                                                        message.push(' ');
1394                                                    }
1395                                                    message.push_str(line);
1396                                                    message
1397                                                }
1398                                            )
1399                                        );
1400                                    }
1401                                }
1402                            })
1403                            .detach()
1404                    } else {
1405                        log::error!("{}:unhandled remote message name:{type_name}", this.name);
1406                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1407                            message_id,
1408                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1409                        ) {
1410                            log::error!(
1411                                "{}:error sending error response for {type_name}:{e:#}",
1412                                this.name
1413                            );
1414                        }
1415                    }
1416                }
1417            }
1418            anyhow::Ok(())
1419        })
1420    }
1421
1422    fn reconnect(
1423        self: &Arc<Self>,
1424        incoming_rx: UnboundedReceiver<Envelope>,
1425        outgoing_tx: UnboundedSender<Envelope>,
1426        cx: &AsyncApp,
1427    ) {
1428        *self.outgoing_tx.lock() = outgoing_tx;
1429        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1430    }
1431
1432    fn request<T: RequestMessage>(
1433        &self,
1434        payload: T,
1435    ) -> impl 'static + Future<Output = Result<T::Response>> {
1436        self.request_internal(payload, true)
1437    }
1438
1439    fn request_internal<T: RequestMessage>(
1440        &self,
1441        payload: T,
1442        use_buffer: bool,
1443    ) -> impl 'static + Future<Output = Result<T::Response>> {
1444        log::debug!("remote request start. name:{}", T::NAME);
1445        let response =
1446            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1447        async move {
1448            let response = response.await?;
1449            log::debug!("remote request finish. name:{}", T::NAME);
1450            T::Response::from_envelope(response).context("received a response of the wrong type")
1451        }
1452    }
1453
1454    async fn resync(&self, timeout: Duration) -> Result<()> {
1455        smol::future::or(
1456            async {
1457                self.request_internal(proto::FlushBufferedMessages {}, false)
1458                    .await?;
1459
1460                for envelope in self.buffer.lock().iter() {
1461                    self.outgoing_tx
1462                        .lock()
1463                        .unbounded_send(envelope.clone())
1464                        .ok();
1465                }
1466                Ok(())
1467            },
1468            async {
1469                smol::Timer::after(timeout).await;
1470                anyhow::bail!("Timed out resyncing remote client")
1471            },
1472        )
1473        .await
1474    }
1475
1476    async fn ping(&self, timeout: Duration) -> Result<()> {
1477        smol::future::or(
1478            async {
1479                self.request(proto::Ping {}).await?;
1480                Ok(())
1481            },
1482            async {
1483                smol::Timer::after(timeout).await;
1484                anyhow::bail!("Timed out pinging remote client")
1485            },
1486        )
1487        .await
1488    }
1489
1490    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1491        log::debug!("remote send name:{}", T::NAME);
1492        self.send_dynamic(payload.into_envelope(0, None, None))
1493    }
1494
1495    fn request_dynamic(
1496        &self,
1497        mut envelope: proto::Envelope,
1498        type_name: &'static str,
1499        use_buffer: bool,
1500    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1501        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1502        let (tx, rx) = oneshot::channel();
1503        let mut response_channels_lock = self.response_channels.lock();
1504        response_channels_lock.insert(MessageId(envelope.id), tx);
1505        drop(response_channels_lock);
1506
1507        let result = if use_buffer {
1508            self.send_buffered(envelope)
1509        } else {
1510            self.send_unbuffered(envelope)
1511        };
1512        async move {
1513            if let Err(error) = &result {
1514                log::error!("failed to send message: {error}");
1515                anyhow::bail!("failed to send message: {error}");
1516            }
1517
1518            let response = rx.await.context("connection lost")?.0;
1519            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1520                return Err(RpcError::from_proto(error, type_name));
1521            }
1522            Ok(response)
1523        }
1524    }
1525
1526    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1527        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1528        self.send_buffered(envelope)
1529    }
1530
1531    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1532        envelope.ack_id = Some(self.max_received.load(SeqCst));
1533        self.buffer.lock().push_back(envelope.clone());
1534        // ignore errors on send (happen while we're reconnecting)
1535        // assume that the global "disconnected" overlay is sufficient.
1536        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1537        Ok(())
1538    }
1539
1540    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1541        envelope.ack_id = Some(self.max_received.load(SeqCst));
1542        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1543        Ok(())
1544    }
1545}
1546
1547impl ProtoClient for ChannelClient {
1548    fn request(
1549        &self,
1550        envelope: proto::Envelope,
1551        request_type: &'static str,
1552    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1553        self.request_dynamic(envelope, request_type, true).boxed()
1554    }
1555
1556    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1557        self.send_dynamic(envelope)
1558    }
1559
1560    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1561        self.send_dynamic(envelope)
1562    }
1563
1564    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1565        &self.message_handlers
1566    }
1567
1568    fn is_via_collab(&self) -> bool {
1569        false
1570    }
1571
1572    fn has_wsl_interop(&self) -> bool {
1573        self.has_wsl_interop
1574    }
1575}
1576
1577#[cfg(any(test, feature = "test-support"))]
1578mod fake {
1579    use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1580    use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1581    use anyhow::Result;
1582    use askpass::EncryptedPassword;
1583    use async_trait::async_trait;
1584    use collections::HashMap;
1585    use futures::{
1586        FutureExt, SinkExt, StreamExt,
1587        channel::{
1588            mpsc::{self, Sender},
1589            oneshot,
1590        },
1591        select_biased,
1592    };
1593    use gpui::{App, AppContext as _, AsyncApp, Task, TestAppContext};
1594    use release_channel::ReleaseChannel;
1595    use rpc::proto::Envelope;
1596    use semver::Version;
1597    use std::{path::PathBuf, sync::Arc};
1598    use util::paths::{PathStyle, RemotePathBuf};
1599
1600    pub(super) struct FakeRemoteConnection {
1601        pub(super) connection_options: RemoteConnectionOptions,
1602        pub(super) server_channel: Arc<ChannelClient>,
1603        pub(super) server_cx: SendableCx,
1604    }
1605
1606    pub(super) struct SendableCx(AsyncApp);
1607    impl SendableCx {
1608        // SAFETY: When run in test mode, GPUI is always single threaded.
1609        pub(super) fn new(cx: &TestAppContext) -> Self {
1610            Self(cx.to_async())
1611        }
1612
1613        // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1614        fn get(&self, _: &AsyncApp) -> AsyncApp {
1615            self.0.clone()
1616        }
1617    }
1618
1619    // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1620    unsafe impl Send for SendableCx {}
1621    unsafe impl Sync for SendableCx {}
1622
1623    #[async_trait(?Send)]
1624    impl RemoteConnection for FakeRemoteConnection {
1625        async fn kill(&self) -> Result<()> {
1626            Ok(())
1627        }
1628
1629        fn has_been_killed(&self) -> bool {
1630            false
1631        }
1632
1633        fn build_command(
1634            &self,
1635            program: Option<String>,
1636            args: &[String],
1637            env: &HashMap<String, String>,
1638            _: Option<String>,
1639            _: Option<(u16, String, u16)>,
1640        ) -> Result<CommandTemplate> {
1641            let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1642            let mut ssh_args = Vec::new();
1643            ssh_args.push(ssh_program);
1644            ssh_args.extend(args.iter().cloned());
1645            Ok(CommandTemplate {
1646                program: "ssh".into(),
1647                args: ssh_args,
1648                env: env.clone(),
1649            })
1650        }
1651
1652        fn build_forward_ports_command(
1653            &self,
1654            forwards: Vec<(u16, String, u16)>,
1655        ) -> anyhow::Result<CommandTemplate> {
1656            Ok(CommandTemplate {
1657                program: "ssh".into(),
1658                args: std::iter::once("-N".to_owned())
1659                    .chain(forwards.into_iter().map(|(local_port, host, remote_port)| {
1660                        format!("{local_port}:{host}:{remote_port}")
1661                    }))
1662                    .collect(),
1663                env: Default::default(),
1664            })
1665        }
1666
1667        fn upload_directory(
1668            &self,
1669            _src_path: PathBuf,
1670            _dest_path: RemotePathBuf,
1671            _cx: &App,
1672        ) -> Task<Result<()>> {
1673            unreachable!()
1674        }
1675
1676        fn connection_options(&self) -> RemoteConnectionOptions {
1677            self.connection_options.clone()
1678        }
1679
1680        fn simulate_disconnect(&self, cx: &AsyncApp) {
1681            let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1682            let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1683            self.server_channel
1684                .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1685        }
1686
1687        fn start_proxy(
1688            &self,
1689            _unique_identifier: String,
1690            _reconnect: bool,
1691            mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1692            mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1693            mut connection_activity_tx: Sender<()>,
1694            _delegate: Arc<dyn RemoteClientDelegate>,
1695            cx: &mut AsyncApp,
1696        ) -> Task<Result<i32>> {
1697            let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1698            let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1699
1700            self.server_channel.reconnect(
1701                server_incoming_rx,
1702                server_outgoing_tx,
1703                &self.server_cx.get(cx),
1704            );
1705
1706            cx.background_spawn(async move {
1707                loop {
1708                    select_biased! {
1709                        server_to_client = server_outgoing_rx.next().fuse() => {
1710                            let Some(server_to_client) = server_to_client else {
1711                                return Ok(1)
1712                            };
1713                            connection_activity_tx.try_send(()).ok();
1714                            client_incoming_tx.send(server_to_client).await.ok();
1715                        }
1716                        client_to_server = client_outgoing_rx.next().fuse() => {
1717                            let Some(client_to_server) = client_to_server else {
1718                                return Ok(1)
1719                            };
1720                            server_incoming_tx.send(client_to_server).await.ok();
1721                        }
1722                    }
1723                }
1724            })
1725        }
1726
1727        fn path_style(&self) -> PathStyle {
1728            PathStyle::local()
1729        }
1730
1731        fn shell(&self) -> String {
1732            "sh".to_owned()
1733        }
1734
1735        fn default_system_shell(&self) -> String {
1736            "sh".to_owned()
1737        }
1738
1739        fn has_wsl_interop(&self) -> bool {
1740            false
1741        }
1742    }
1743
1744    pub(super) struct Delegate;
1745
1746    impl RemoteClientDelegate for Delegate {
1747        fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1748            unreachable!()
1749        }
1750
1751        fn download_server_binary_locally(
1752            &self,
1753            _: RemotePlatform,
1754            _: ReleaseChannel,
1755            _: Option<Version>,
1756            _: &mut AsyncApp,
1757        ) -> Task<Result<PathBuf>> {
1758            unreachable!()
1759        }
1760
1761        fn get_download_url(
1762            &self,
1763            _platform: RemotePlatform,
1764            _release_channel: ReleaseChannel,
1765            _version: Option<Version>,
1766            _cx: &mut AsyncApp,
1767        ) -> Task<Result<Option<String>>> {
1768            unreachable!()
1769        }
1770
1771        fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1772    }
1773}