remote_client.rs

   1use crate::{
   2    SshConnectionOptions,
   3    protocol::MessageId,
   4    proxy::ProxyLaunchError,
   5    transport::{
   6        docker::{DockerConnectionOptions, DockerExecConnection},
   7        ssh::SshRemoteConnection,
   8        wsl::{WslConnectionOptions, WslRemoteConnection},
   9    },
  10};
  11use anyhow::{Context as _, Result, anyhow};
  12use askpass::EncryptedPassword;
  13use async_trait::async_trait;
  14use collections::HashMap;
  15use futures::{
  16    Future, FutureExt as _, StreamExt as _,
  17    channel::{
  18        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  19        oneshot,
  20    },
  21    future::{BoxFuture, Shared},
  22    select, select_biased,
  23};
  24use gpui::{
  25    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  26    EventEmitter, FutureExt, Global, Task, WeakEntity,
  27};
  28use parking_lot::Mutex;
  29
  30use release_channel::ReleaseChannel;
  31use rpc::{
  32    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  33    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  34};
  35use semver::Version;
  36use std::{
  37    collections::VecDeque,
  38    fmt,
  39    ops::ControlFlow,
  40    path::PathBuf,
  41    sync::{
  42        Arc, Weak,
  43        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  44    },
  45    time::{Duration, Instant},
  46};
  47use util::{
  48    ResultExt,
  49    paths::{PathStyle, RemotePathBuf},
  50};
  51
  52#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  53pub enum RemoteOs {
  54    Linux,
  55    MacOs,
  56    Windows,
  57}
  58
  59impl RemoteOs {
  60    pub fn as_str(&self) -> &'static str {
  61        match self {
  62            RemoteOs::Linux => "linux",
  63            RemoteOs::MacOs => "macos",
  64            RemoteOs::Windows => "windows",
  65        }
  66    }
  67
  68    pub fn is_windows(&self) -> bool {
  69        matches!(self, RemoteOs::Windows)
  70    }
  71}
  72
  73impl std::fmt::Display for RemoteOs {
  74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  75        f.write_str(self.as_str())
  76    }
  77}
  78
  79#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  80pub enum RemoteArch {
  81    X86_64,
  82    Aarch64,
  83}
  84
  85impl RemoteArch {
  86    pub fn as_str(&self) -> &'static str {
  87        match self {
  88            RemoteArch::X86_64 => "x86_64",
  89            RemoteArch::Aarch64 => "aarch64",
  90        }
  91    }
  92}
  93
  94impl std::fmt::Display for RemoteArch {
  95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  96        f.write_str(self.as_str())
  97    }
  98}
  99
 100#[derive(Copy, Clone, Debug)]
 101pub struct RemotePlatform {
 102    pub os: RemoteOs,
 103    pub arch: RemoteArch,
 104}
 105
 106#[derive(Clone, Debug)]
 107pub struct CommandTemplate {
 108    pub program: String,
 109    pub args: Vec<String>,
 110    pub env: HashMap<String, String>,
 111}
 112
 113pub trait RemoteClientDelegate: Send + Sync {
 114    fn ask_password(
 115        &self,
 116        prompt: String,
 117        tx: oneshot::Sender<EncryptedPassword>,
 118        cx: &mut AsyncApp,
 119    );
 120    fn get_download_url(
 121        &self,
 122        platform: RemotePlatform,
 123        release_channel: ReleaseChannel,
 124        version: Option<Version>,
 125        cx: &mut AsyncApp,
 126    ) -> Task<Result<Option<String>>>;
 127    fn download_server_binary_locally(
 128        &self,
 129        platform: RemotePlatform,
 130        release_channel: ReleaseChannel,
 131        version: Option<Version>,
 132        cx: &mut AsyncApp,
 133    ) -> Task<Result<PathBuf>>;
 134    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
 135}
 136
 137const MAX_MISSED_HEARTBEATS: usize = 5;
 138const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
 139const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
 140const INITIAL_CONNECTION_TIMEOUT: Duration =
 141    Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
 142
 143const MAX_RECONNECT_ATTEMPTS: usize = 3;
 144
 145enum State {
 146    Connecting,
 147    Connected {
 148        remote_connection: Arc<dyn RemoteConnection>,
 149        delegate: Arc<dyn RemoteClientDelegate>,
 150
 151        multiplex_task: Task<Result<()>>,
 152        heartbeat_task: Task<Result<()>>,
 153    },
 154    HeartbeatMissed {
 155        missed_heartbeats: usize,
 156
 157        remote_connection: Arc<dyn RemoteConnection>,
 158        delegate: Arc<dyn RemoteClientDelegate>,
 159
 160        multiplex_task: Task<Result<()>>,
 161        heartbeat_task: Task<Result<()>>,
 162    },
 163    Reconnecting,
 164    ReconnectFailed {
 165        remote_connection: Arc<dyn RemoteConnection>,
 166        delegate: Arc<dyn RemoteClientDelegate>,
 167
 168        error: anyhow::Error,
 169        attempts: usize,
 170    },
 171    ReconnectExhausted,
 172    ServerNotRunning,
 173}
 174
 175impl fmt::Display for State {
 176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 177        match self {
 178            Self::Connecting => write!(f, "connecting"),
 179            Self::Connected { .. } => write!(f, "connected"),
 180            Self::Reconnecting => write!(f, "reconnecting"),
 181            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 182            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 183            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 184            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 185        }
 186    }
 187}
 188
 189impl State {
 190    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 191        match self {
 192            Self::Connected {
 193                remote_connection, ..
 194            } => Some(remote_connection.clone()),
 195            Self::HeartbeatMissed {
 196                remote_connection, ..
 197            } => Some(remote_connection.clone()),
 198            Self::ReconnectFailed {
 199                remote_connection, ..
 200            } => Some(remote_connection.clone()),
 201            _ => None,
 202        }
 203    }
 204
 205    fn can_reconnect(&self) -> bool {
 206        match self {
 207            Self::Connected { .. }
 208            | Self::HeartbeatMissed { .. }
 209            | Self::ReconnectFailed { .. } => true,
 210            State::Connecting
 211            | State::Reconnecting
 212            | State::ReconnectExhausted
 213            | State::ServerNotRunning => false,
 214        }
 215    }
 216
 217    fn is_reconnect_failed(&self) -> bool {
 218        matches!(self, Self::ReconnectFailed { .. })
 219    }
 220
 221    fn is_reconnect_exhausted(&self) -> bool {
 222        matches!(self, Self::ReconnectExhausted { .. })
 223    }
 224
 225    fn is_server_not_running(&self) -> bool {
 226        matches!(self, Self::ServerNotRunning)
 227    }
 228
 229    fn is_reconnecting(&self) -> bool {
 230        matches!(self, Self::Reconnecting { .. })
 231    }
 232
 233    fn heartbeat_recovered(self) -> Self {
 234        match self {
 235            Self::HeartbeatMissed {
 236                remote_connection,
 237                delegate,
 238                multiplex_task,
 239                heartbeat_task,
 240                ..
 241            } => Self::Connected {
 242                remote_connection: remote_connection,
 243                delegate,
 244                multiplex_task,
 245                heartbeat_task,
 246            },
 247            _ => self,
 248        }
 249    }
 250
 251    fn heartbeat_missed(self) -> Self {
 252        match self {
 253            Self::Connected {
 254                remote_connection,
 255                delegate,
 256                multiplex_task,
 257                heartbeat_task,
 258            } => Self::HeartbeatMissed {
 259                missed_heartbeats: 1,
 260                remote_connection,
 261                delegate,
 262                multiplex_task,
 263                heartbeat_task,
 264            },
 265            Self::HeartbeatMissed {
 266                missed_heartbeats,
 267                remote_connection,
 268                delegate,
 269                multiplex_task,
 270                heartbeat_task,
 271            } => Self::HeartbeatMissed {
 272                missed_heartbeats: missed_heartbeats + 1,
 273                remote_connection,
 274                delegate,
 275                multiplex_task,
 276                heartbeat_task,
 277            },
 278            _ => self,
 279        }
 280    }
 281}
 282
 283/// The state of the ssh connection.
 284#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 285pub enum ConnectionState {
 286    Connecting,
 287    Connected,
 288    HeartbeatMissed,
 289    Reconnecting,
 290    Disconnected,
 291}
 292
 293impl From<&State> for ConnectionState {
 294    fn from(value: &State) -> Self {
 295        match value {
 296            State::Connecting => Self::Connecting,
 297            State::Connected { .. } => Self::Connected,
 298            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 299            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 300            State::ReconnectExhausted => Self::Disconnected,
 301            State::ServerNotRunning => Self::Disconnected,
 302        }
 303    }
 304}
 305
 306pub struct RemoteClient {
 307    client: Arc<ChannelClient>,
 308    unique_identifier: String,
 309    connection_options: RemoteConnectionOptions,
 310    path_style: PathStyle,
 311    state: Option<State>,
 312}
 313
 314#[derive(Debug)]
 315pub enum RemoteClientEvent {
 316    Disconnected,
 317}
 318
 319impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 320
 321/// Identifies the socket on the remote server so that reconnects
 322/// can re-join the same project.
 323pub enum ConnectionIdentifier {
 324    Setup(u64),
 325    Workspace(i64),
 326}
 327
 328static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 329
 330impl ConnectionIdentifier {
 331    pub fn setup() -> Self {
 332        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 333    }
 334
 335    // This string gets used in a socket name, and so must be relatively short.
 336    // The total length of:
 337    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 338    // Must be less than about 100 characters
 339    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 340    // So our strings should be at most 20 characters or so.
 341    fn to_string(&self, cx: &App) -> String {
 342        let identifier_prefix = match ReleaseChannel::global(cx) {
 343            ReleaseChannel::Stable => "".to_string(),
 344            release_channel => format!("{}-", release_channel.dev_name()),
 345        };
 346        match self {
 347            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 348            Self::Workspace(workspace_id) => {
 349                format!("{identifier_prefix}workspace-{workspace_id}",)
 350            }
 351        }
 352    }
 353}
 354
 355pub async fn connect(
 356    connection_options: RemoteConnectionOptions,
 357    delegate: Arc<dyn RemoteClientDelegate>,
 358    cx: &mut AsyncApp,
 359) -> Result<Arc<dyn RemoteConnection>> {
 360    cx.update(|cx| {
 361        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 362            pool.connect(connection_options.clone(), delegate.clone(), cx)
 363        })
 364    })?
 365    .await
 366    .map_err(|e| e.cloned())
 367}
 368
 369impl RemoteClient {
 370    pub fn new(
 371        unique_identifier: ConnectionIdentifier,
 372        remote_connection: Arc<dyn RemoteConnection>,
 373        cancellation: oneshot::Receiver<()>,
 374        delegate: Arc<dyn RemoteClientDelegate>,
 375        cx: &mut App,
 376    ) -> Task<Result<Option<Entity<Self>>>> {
 377        let unique_identifier = unique_identifier.to_string(cx);
 378        cx.spawn(async move |cx| {
 379            let success = Box::pin(async move {
 380                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 381                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 382                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 383
 384                let client = cx.update(|cx| {
 385                    ChannelClient::new(
 386                        incoming_rx,
 387                        outgoing_tx,
 388                        cx,
 389                        "client",
 390                        remote_connection.has_wsl_interop(),
 391                    )
 392                })?;
 393
 394                let path_style = remote_connection.path_style();
 395                let this = cx.new(|_| Self {
 396                    client: client.clone(),
 397                    unique_identifier: unique_identifier.clone(),
 398                    connection_options: remote_connection.connection_options(),
 399                    path_style,
 400                    state: Some(State::Connecting),
 401                })?;
 402
 403                let io_task = remote_connection.start_proxy(
 404                    unique_identifier,
 405                    false,
 406                    incoming_tx,
 407                    outgoing_rx,
 408                    connection_activity_tx,
 409                    delegate.clone(),
 410                    cx,
 411                );
 412
 413                let ready = client
 414                    .wait_for_remote_started()
 415                    .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
 416                    .await;
 417                match ready {
 418                    Ok(Some(_)) => {}
 419                    Ok(None) => {
 420                        let mut error = "remote client exited before becoming ready".to_owned();
 421                        if let Some(status) = io_task.now_or_never() {
 422                            match status {
 423                                Ok(exit_code) => {
 424                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 425                                }
 426                                Err(e) => error.push_str(&format!(", error={e:?}")),
 427                            }
 428                        }
 429                        let error = anyhow::anyhow!("{error}");
 430                        log::error!("failed to establish connection: {}", error);
 431                        return Err(error);
 432                    }
 433                    Err(_) => {
 434                        let mut error =
 435                            "remote client did not become ready within the timeout".to_owned();
 436                        if let Some(status) = io_task.now_or_never() {
 437                            match status {
 438                                Ok(exit_code) => {
 439                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 440                                }
 441                                Err(e) => error.push_str(&format!(", error={e:?}")),
 442                            }
 443                        }
 444                        let error = anyhow::anyhow!("{error}");
 445                        log::error!("failed to establish connection: {}", error);
 446                        return Err(error);
 447                    }
 448                }
 449                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 450                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 451                    log::error!("failed to establish connection: {}", error);
 452                    return Err(error);
 453                }
 454
 455                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 456
 457                this.update(cx, |this, _| {
 458                    this.state = Some(State::Connected {
 459                        remote_connection,
 460                        delegate,
 461                        multiplex_task,
 462                        heartbeat_task,
 463                    });
 464                })?;
 465
 466                Ok(Some(this))
 467            });
 468
 469            select! {
 470                _ = cancellation.fuse() => {
 471                    Ok(None)
 472                }
 473                result = success.fuse() =>  result
 474            }
 475        })
 476    }
 477
 478    pub fn proto_client_from_channels(
 479        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 480        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 481        cx: &App,
 482        name: &'static str,
 483        has_wsl_interop: bool,
 484    ) -> AnyProtoClient {
 485        ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
 486    }
 487
 488    pub fn shutdown_processes<T: RequestMessage>(
 489        &mut self,
 490        shutdown_request: Option<T>,
 491        executor: BackgroundExecutor,
 492    ) -> Option<impl Future<Output = ()> + use<T>> {
 493        let state = self.state.take()?;
 494        log::info!("shutting down ssh processes");
 495
 496        let State::Connected {
 497            multiplex_task,
 498            heartbeat_task,
 499            remote_connection,
 500            delegate,
 501        } = state
 502        else {
 503            return None;
 504        };
 505
 506        let client = self.client.clone();
 507
 508        Some(async move {
 509            if let Some(shutdown_request) = shutdown_request {
 510                client.send(shutdown_request).log_err();
 511                // We wait 50ms instead of waiting for a response, because
 512                // waiting for a response would require us to wait on the main thread
 513                // which we want to avoid in an `on_app_quit` callback.
 514                executor.timer(Duration::from_millis(50)).await;
 515            }
 516
 517            // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
 518            // child of master_process.
 519            drop(multiplex_task);
 520            // Now drop the rest of state, which kills master process.
 521            drop(heartbeat_task);
 522            drop(remote_connection);
 523            drop(delegate);
 524        })
 525    }
 526
 527    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 528        let can_reconnect = self
 529            .state
 530            .as_ref()
 531            .map(|state| state.can_reconnect())
 532            .unwrap_or(false);
 533        if !can_reconnect {
 534            log::info!("aborting reconnect, because not in state that allows reconnecting");
 535            let error = if let Some(state) = self.state.as_ref() {
 536                format!("invalid state, cannot reconnect while in state {state}")
 537            } else {
 538                "no state set".to_string()
 539            };
 540            anyhow::bail!(error);
 541        }
 542
 543        let state = self.state.take().unwrap();
 544        let (attempts, remote_connection, delegate) = match state {
 545            State::Connected {
 546                remote_connection,
 547                delegate,
 548                multiplex_task,
 549                heartbeat_task,
 550            }
 551            | State::HeartbeatMissed {
 552                remote_connection,
 553                delegate,
 554                multiplex_task,
 555                heartbeat_task,
 556                ..
 557            } => {
 558                drop(multiplex_task);
 559                drop(heartbeat_task);
 560                (0, remote_connection, delegate)
 561            }
 562            State::ReconnectFailed {
 563                attempts,
 564                remote_connection,
 565                delegate,
 566                ..
 567            } => (attempts, remote_connection, delegate),
 568            State::Connecting
 569            | State::Reconnecting
 570            | State::ReconnectExhausted
 571            | State::ServerNotRunning => unreachable!(),
 572        };
 573
 574        let attempts = attempts + 1;
 575        if attempts > MAX_RECONNECT_ATTEMPTS {
 576            log::error!(
 577                "Failed to reconnect to after {} attempts, giving up",
 578                MAX_RECONNECT_ATTEMPTS
 579            );
 580            self.set_state(State::ReconnectExhausted, cx);
 581            return Ok(());
 582        }
 583
 584        self.set_state(State::Reconnecting, cx);
 585
 586        log::info!(
 587            "Trying to reconnect to remote server... Attempt {}",
 588            attempts
 589        );
 590
 591        let unique_identifier = self.unique_identifier.clone();
 592        let client = self.client.clone();
 593        let reconnect_task = cx.spawn(async move |this, cx| {
 594            macro_rules! failed {
 595                ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
 596                    delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
 597                    return State::ReconnectFailed {
 598                        error: anyhow!($error),
 599                        attempts: $attempts,
 600                        remote_connection: $remote_connection,
 601                        delegate: $delegate,
 602                    };
 603                };
 604            }
 605
 606            if let Err(error) = remote_connection
 607                .kill()
 608                .await
 609                .context("Failed to kill remote_connection process")
 610            {
 611                failed!(error, attempts, remote_connection, delegate);
 612            };
 613
 614            let connection_options = remote_connection.connection_options();
 615
 616            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 617            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 618            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 619
 620            let (remote_connection, io_task) = match async {
 621                let remote_connection = cx
 622                    .update_global(|pool: &mut ConnectionPool, cx| {
 623                        pool.connect(connection_options, delegate.clone(), cx)
 624                    })?
 625                    .await
 626                    .map_err(|error| error.cloned())?;
 627
 628                let io_task = remote_connection.start_proxy(
 629                    unique_identifier,
 630                    true,
 631                    incoming_tx,
 632                    outgoing_rx,
 633                    connection_activity_tx,
 634                    delegate.clone(),
 635                    cx,
 636                );
 637                anyhow::Ok((remote_connection, io_task))
 638            }
 639            .await
 640            {
 641                Ok((remote_connection, io_task)) => (remote_connection, io_task),
 642                Err(error) => {
 643                    failed!(error, attempts, remote_connection, delegate);
 644                }
 645            };
 646
 647            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 648            client.reconnect(incoming_rx, outgoing_tx, cx);
 649
 650            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 651                failed!(error, attempts, remote_connection, delegate);
 652            };
 653
 654            State::Connected {
 655                remote_connection: remote_connection,
 656                delegate,
 657                multiplex_task,
 658                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 659            }
 660        });
 661
 662        cx.spawn(async move |this, cx| {
 663            let new_state = reconnect_task.await;
 664            this.update(cx, |this, cx| {
 665                this.try_set_state(cx, |old_state| {
 666                    if old_state.is_reconnecting() {
 667                        match &new_state {
 668                            State::Connecting
 669                            | State::Reconnecting
 670                            | State::HeartbeatMissed { .. }
 671                            | State::ServerNotRunning => {}
 672                            State::Connected { .. } => {
 673                                log::info!("Successfully reconnected");
 674                            }
 675                            State::ReconnectFailed {
 676                                error, attempts, ..
 677                            } => {
 678                                log::error!(
 679                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 680                                    attempts,
 681                                    error
 682                                );
 683                            }
 684                            State::ReconnectExhausted => {
 685                                log::error!("Reconnect attempt failed and all attempts exhausted");
 686                            }
 687                        }
 688                        Some(new_state)
 689                    } else {
 690                        None
 691                    }
 692                });
 693
 694                if this.state_is(State::is_reconnect_failed) {
 695                    this.reconnect(cx)
 696                } else if this.state_is(State::is_reconnect_exhausted) {
 697                    Ok(())
 698                } else {
 699                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 700                    Ok(())
 701                }
 702            })
 703        })
 704        .detach_and_log_err(cx);
 705
 706        Ok(())
 707    }
 708
 709    fn heartbeat(
 710        this: WeakEntity<Self>,
 711        mut connection_activity_rx: mpsc::Receiver<()>,
 712        cx: &mut AsyncApp,
 713    ) -> Task<Result<()>> {
 714        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 715            return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
 716        };
 717
 718        cx.spawn(async move |cx| {
 719            let mut missed_heartbeats = 0;
 720
 721            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 722            futures::pin_mut!(keepalive_timer);
 723
 724            loop {
 725                select_biased! {
 726                    result = connection_activity_rx.next().fuse() => {
 727                        if result.is_none() {
 728                            log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
 729                            return Ok(());
 730                        }
 731
 732                        if missed_heartbeats != 0 {
 733                            missed_heartbeats = 0;
 734                            let _ =this.update(cx, |this, cx| {
 735                                this.handle_heartbeat_result(missed_heartbeats, cx)
 736                            })?;
 737                        }
 738                    }
 739                    _ = keepalive_timer => {
 740                        log::debug!("Sending heartbeat to server...");
 741
 742                        let result = select_biased! {
 743                            _ = connection_activity_rx.next().fuse() => {
 744                                Ok(())
 745                            }
 746                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 747                                ping_result
 748                            }
 749                        };
 750
 751                        if result.is_err() {
 752                            missed_heartbeats += 1;
 753                            log::warn!(
 754                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 755                                HEARTBEAT_TIMEOUT,
 756                                missed_heartbeats,
 757                                MAX_MISSED_HEARTBEATS
 758                            );
 759                        } else if missed_heartbeats != 0 {
 760                            missed_heartbeats = 0;
 761                        } else {
 762                            continue;
 763                        }
 764
 765                        let result = this.update(cx, |this, cx| {
 766                            this.handle_heartbeat_result(missed_heartbeats, cx)
 767                        })?;
 768                        if result.is_break() {
 769                            return Ok(());
 770                        }
 771                    }
 772                }
 773
 774                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 775            }
 776        })
 777    }
 778
 779    fn handle_heartbeat_result(
 780        &mut self,
 781        missed_heartbeats: usize,
 782        cx: &mut Context<Self>,
 783    ) -> ControlFlow<()> {
 784        let state = self.state.take().unwrap();
 785        let next_state = if missed_heartbeats > 0 {
 786            state.heartbeat_missed()
 787        } else {
 788            state.heartbeat_recovered()
 789        };
 790
 791        self.set_state(next_state, cx);
 792
 793        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 794            log::error!(
 795                "Missed last {} heartbeats. Reconnecting...",
 796                missed_heartbeats
 797            );
 798
 799            self.reconnect(cx)
 800                .context("failed to start reconnect process after missing heartbeats")
 801                .log_err();
 802            ControlFlow::Break(())
 803        } else {
 804            ControlFlow::Continue(())
 805        }
 806    }
 807
 808    fn monitor(
 809        this: WeakEntity<Self>,
 810        io_task: Task<Result<i32>>,
 811        cx: &AsyncApp,
 812    ) -> Task<Result<()>> {
 813        cx.spawn(async move |cx| {
 814            let result = io_task.await;
 815
 816            match result {
 817                Ok(exit_code) => {
 818                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 819                        match error {
 820                            ProxyLaunchError::ServerNotRunning => {
 821                                log::error!("failed to reconnect because server is not running");
 822                                this.update(cx, |this, cx| {
 823                                    this.set_state(State::ServerNotRunning, cx);
 824                                })?;
 825                            }
 826                        }
 827                    } else if exit_code > 0 {
 828                        log::error!("proxy process terminated unexpectedly");
 829                        this.update(cx, |this, cx| {
 830                            this.reconnect(cx).ok();
 831                        })?;
 832                    }
 833                }
 834                Err(error) => {
 835                    log::warn!(
 836                        "remote io task died with error: {:?}. reconnecting...",
 837                        error
 838                    );
 839                    this.update(cx, |this, cx| {
 840                        this.reconnect(cx).ok();
 841                    })?;
 842                }
 843            }
 844
 845            Ok(())
 846        })
 847    }
 848
 849    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 850        self.state.as_ref().is_some_and(check)
 851    }
 852
 853    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 854        let new_state = self.state.as_ref().and_then(map);
 855        if let Some(new_state) = new_state {
 856            self.state.replace(new_state);
 857            cx.notify();
 858        }
 859    }
 860
 861    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 862        log::info!("setting state to '{}'", &state);
 863
 864        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 865        let is_server_not_running = state.is_server_not_running();
 866        self.state.replace(state);
 867
 868        if is_reconnect_exhausted || is_server_not_running {
 869            cx.emit(RemoteClientEvent::Disconnected);
 870        }
 871        cx.notify();
 872    }
 873
 874    pub fn shell(&self) -> Option<String> {
 875        Some(self.remote_connection()?.shell())
 876    }
 877
 878    pub fn default_system_shell(&self) -> Option<String> {
 879        Some(self.remote_connection()?.default_system_shell())
 880    }
 881
 882    pub fn shares_network_interface(&self) -> bool {
 883        self.remote_connection()
 884            .map_or(false, |connection| connection.shares_network_interface())
 885    }
 886
 887    pub fn build_command(
 888        &self,
 889        program: Option<String>,
 890        args: &[String],
 891        env: &HashMap<String, String>,
 892        working_dir: Option<String>,
 893        port_forward: Option<(u16, String, u16)>,
 894    ) -> Result<CommandTemplate> {
 895        let Some(connection) = self.remote_connection() else {
 896            return Err(anyhow!("no remote connection"));
 897        };
 898        connection.build_command(program, args, env, working_dir, port_forward)
 899    }
 900
 901    pub fn build_forward_ports_command(
 902        &self,
 903        forwards: Vec<(u16, String, u16)>,
 904    ) -> Result<CommandTemplate> {
 905        let Some(connection) = self.remote_connection() else {
 906            return Err(anyhow!("no remote connection"));
 907        };
 908        connection.build_forward_ports_command(forwards)
 909    }
 910
 911    pub fn upload_directory(
 912        &self,
 913        src_path: PathBuf,
 914        dest_path: RemotePathBuf,
 915        cx: &App,
 916    ) -> Task<Result<()>> {
 917        let Some(connection) = self.remote_connection() else {
 918            return Task::ready(Err(anyhow!("no remote connection")));
 919        };
 920        connection.upload_directory(src_path, dest_path, cx)
 921    }
 922
 923    pub fn proto_client(&self) -> AnyProtoClient {
 924        self.client.clone().into()
 925    }
 926
 927    pub fn connection_options(&self) -> RemoteConnectionOptions {
 928        self.connection_options.clone()
 929    }
 930
 931    pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 932        if let State::Connected {
 933            remote_connection, ..
 934        } = self.state.as_ref()?
 935        {
 936            Some(remote_connection.clone())
 937        } else {
 938            None
 939        }
 940    }
 941
 942    pub fn connection_state(&self) -> ConnectionState {
 943        self.state
 944            .as_ref()
 945            .map(ConnectionState::from)
 946            .unwrap_or(ConnectionState::Disconnected)
 947    }
 948
 949    pub fn is_disconnected(&self) -> bool {
 950        self.connection_state() == ConnectionState::Disconnected
 951    }
 952
 953    pub fn path_style(&self) -> PathStyle {
 954        self.path_style
 955    }
 956
 957    #[cfg(any(test, feature = "test-support"))]
 958    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
 959        let opts = self.connection_options();
 960        client_cx.spawn(async move |cx| {
 961            let connection = cx
 962                .update_global(|c: &mut ConnectionPool, _| {
 963                    if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
 964                        c.clone()
 965                    } else {
 966                        panic!("missing test connection")
 967                    }
 968                })
 969                .unwrap()
 970                .await
 971                .unwrap();
 972
 973            connection.simulate_disconnect(cx);
 974        })
 975    }
 976
 977    #[cfg(any(test, feature = "test-support"))]
 978    pub fn fake_server(
 979        client_cx: &mut gpui::TestAppContext,
 980        server_cx: &mut gpui::TestAppContext,
 981    ) -> (RemoteConnectionOptions, AnyProtoClient) {
 982        use crate::transport::ssh::SshConnectionHost;
 983
 984        let port = client_cx
 985            .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
 986        let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
 987            host: SshConnectionHost::from("<fake>".to_string()),
 988            port: Some(port),
 989            ..Default::default()
 990        });
 991        let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
 992        let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
 993        let server_client = server_cx
 994            .update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server", false));
 995        let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
 996            connection_options: opts.clone(),
 997            server_cx: fake::SendableCx::new(server_cx),
 998            server_channel: server_client.clone(),
 999        });
1000
1001        client_cx.update(|cx| {
1002            cx.update_default_global(|c: &mut ConnectionPool, cx| {
1003                c.connections.insert(
1004                    opts.clone(),
1005                    ConnectionPoolEntry::Connecting(
1006                        cx.background_spawn({
1007                            let connection = connection.clone();
1008                            async move { Ok(connection.clone()) }
1009                        })
1010                        .shared(),
1011                    ),
1012                );
1013            })
1014        });
1015
1016        (opts, server_client.into())
1017    }
1018
1019    #[cfg(any(test, feature = "test-support"))]
1020    pub async fn fake_client(
1021        opts: RemoteConnectionOptions,
1022        client_cx: &mut gpui::TestAppContext,
1023    ) -> Entity<Self> {
1024        let (_tx, rx) = oneshot::channel();
1025        let mut cx = client_cx.to_async();
1026        let connection = connect(opts, Arc::new(fake::Delegate), &mut cx)
1027            .await
1028            .unwrap();
1029        client_cx
1030            .update(|cx| {
1031                Self::new(
1032                    ConnectionIdentifier::setup(),
1033                    connection,
1034                    rx,
1035                    Arc::new(fake::Delegate),
1036                    cx,
1037                )
1038            })
1039            .await
1040            .unwrap()
1041            .unwrap()
1042    }
1043
1044    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1045        self.state
1046            .as_ref()
1047            .and_then(|state| state.remote_connection())
1048    }
1049}
1050
1051enum ConnectionPoolEntry {
1052    Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1053    Connected(Weak<dyn RemoteConnection>),
1054}
1055
1056#[derive(Default)]
1057struct ConnectionPool {
1058    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1059}
1060
1061impl Global for ConnectionPool {}
1062
1063impl ConnectionPool {
1064    pub fn connect(
1065        &mut self,
1066        opts: RemoteConnectionOptions,
1067        delegate: Arc<dyn RemoteClientDelegate>,
1068        cx: &mut App,
1069    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1070        let connection = self.connections.get(&opts);
1071        match connection {
1072            Some(ConnectionPoolEntry::Connecting(task)) => {
1073                delegate.set_status(
1074                    Some("Waiting for existing connection attempt"),
1075                    &mut cx.to_async(),
1076                );
1077                return task.clone();
1078            }
1079            Some(ConnectionPoolEntry::Connected(remote)) => {
1080                if let Some(remote) = remote.upgrade()
1081                    && !remote.has_been_killed()
1082                {
1083                    return Task::ready(Ok(remote)).shared();
1084                }
1085                self.connections.remove(&opts);
1086            }
1087            None => {}
1088        }
1089
1090        let task = cx
1091            .spawn({
1092                let opts = opts.clone();
1093                let delegate = delegate.clone();
1094                async move |cx| {
1095                    let connection = match opts.clone() {
1096                        RemoteConnectionOptions::Ssh(opts) => {
1097                            SshRemoteConnection::new(opts, delegate, cx)
1098                                .await
1099                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1100                        }
1101                        RemoteConnectionOptions::Wsl(opts) => {
1102                            WslRemoteConnection::new(opts, delegate, cx)
1103                                .await
1104                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1105                        }
1106                        RemoteConnectionOptions::Docker(opts) => {
1107                            DockerExecConnection::new(opts, delegate, cx)
1108                                .await
1109                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1110                        }
1111                    };
1112
1113                    cx.update_global(|pool: &mut Self, _| {
1114                        debug_assert!(matches!(
1115                            pool.connections.get(&opts),
1116                            Some(ConnectionPoolEntry::Connecting(_))
1117                        ));
1118                        match connection {
1119                            Ok(connection) => {
1120                                pool.connections.insert(
1121                                    opts.clone(),
1122                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1123                                );
1124                                Ok(connection)
1125                            }
1126                            Err(error) => {
1127                                pool.connections.remove(&opts);
1128                                Err(Arc::new(error))
1129                            }
1130                        }
1131                    })?
1132                }
1133            })
1134            .shared();
1135
1136        self.connections
1137            .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1138        task
1139    }
1140}
1141
1142#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1143pub enum RemoteConnectionOptions {
1144    Ssh(SshConnectionOptions),
1145    Wsl(WslConnectionOptions),
1146    Docker(DockerConnectionOptions),
1147}
1148
1149impl RemoteConnectionOptions {
1150    pub fn display_name(&self) -> String {
1151        match self {
1152            RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1153            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1154            RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1155        }
1156    }
1157}
1158
1159impl From<SshConnectionOptions> for RemoteConnectionOptions {
1160    fn from(opts: SshConnectionOptions) -> Self {
1161        RemoteConnectionOptions::Ssh(opts)
1162    }
1163}
1164
1165impl From<WslConnectionOptions> for RemoteConnectionOptions {
1166    fn from(opts: WslConnectionOptions) -> Self {
1167        RemoteConnectionOptions::Wsl(opts)
1168    }
1169}
1170
1171#[cfg(target_os = "windows")]
1172/// Open a wsl path (\\wsl.localhost\<distro>\path)
1173#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1174#[action(namespace = workspace, no_json, no_register)]
1175pub struct OpenWslPath {
1176    pub distro: WslConnectionOptions,
1177    pub paths: Vec<PathBuf>,
1178}
1179
1180#[async_trait(?Send)]
1181pub trait RemoteConnection: Send + Sync {
1182    fn start_proxy(
1183        &self,
1184        unique_identifier: String,
1185        reconnect: bool,
1186        incoming_tx: UnboundedSender<Envelope>,
1187        outgoing_rx: UnboundedReceiver<Envelope>,
1188        connection_activity_tx: Sender<()>,
1189        delegate: Arc<dyn RemoteClientDelegate>,
1190        cx: &mut AsyncApp,
1191    ) -> Task<Result<i32>>;
1192    fn upload_directory(
1193        &self,
1194        src_path: PathBuf,
1195        dest_path: RemotePathBuf,
1196        cx: &App,
1197    ) -> Task<Result<()>>;
1198    async fn kill(&self) -> Result<()>;
1199    fn has_been_killed(&self) -> bool;
1200    fn shares_network_interface(&self) -> bool {
1201        false
1202    }
1203    fn build_command(
1204        &self,
1205        program: Option<String>,
1206        args: &[String],
1207        env: &HashMap<String, String>,
1208        working_dir: Option<String>,
1209        port_forward: Option<(u16, String, u16)>,
1210    ) -> Result<CommandTemplate>;
1211    fn build_forward_ports_command(
1212        &self,
1213        forwards: Vec<(u16, String, u16)>,
1214    ) -> Result<CommandTemplate>;
1215    fn connection_options(&self) -> RemoteConnectionOptions;
1216    fn path_style(&self) -> PathStyle;
1217    fn shell(&self) -> String;
1218    fn default_system_shell(&self) -> String;
1219    fn has_wsl_interop(&self) -> bool;
1220
1221    #[cfg(any(test, feature = "test-support"))]
1222    fn simulate_disconnect(&self, _: &AsyncApp) {}
1223}
1224
1225type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1226
1227struct Signal<T> {
1228    tx: Mutex<Option<oneshot::Sender<T>>>,
1229    rx: Shared<Task<Option<T>>>,
1230}
1231
1232impl<T: Send + Clone + 'static> Signal<T> {
1233    pub fn new(cx: &App) -> Self {
1234        let (tx, rx) = oneshot::channel();
1235
1236        let task = cx
1237            .background_executor()
1238            .spawn(async move { rx.await.ok() })
1239            .shared();
1240
1241        Self {
1242            tx: Mutex::new(Some(tx)),
1243            rx: task,
1244        }
1245    }
1246
1247    fn set(&self, value: T) {
1248        if let Some(tx) = self.tx.lock().take() {
1249            let _ = tx.send(value);
1250        }
1251    }
1252
1253    fn wait(&self) -> Shared<Task<Option<T>>> {
1254        self.rx.clone()
1255    }
1256}
1257
1258struct ChannelClient {
1259    next_message_id: AtomicU32,
1260    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1261    buffer: Mutex<VecDeque<Envelope>>,
1262    response_channels: ResponseChannels,
1263    message_handlers: Mutex<ProtoMessageHandlerSet>,
1264    max_received: AtomicU32,
1265    name: &'static str,
1266    task: Mutex<Task<Result<()>>>,
1267    remote_started: Signal<()>,
1268    has_wsl_interop: bool,
1269}
1270
1271impl ChannelClient {
1272    fn new(
1273        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1274        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1275        cx: &App,
1276        name: &'static str,
1277        has_wsl_interop: bool,
1278    ) -> Arc<Self> {
1279        Arc::new_cyclic(|this| Self {
1280            outgoing_tx: Mutex::new(outgoing_tx),
1281            next_message_id: AtomicU32::new(0),
1282            max_received: AtomicU32::new(0),
1283            response_channels: ResponseChannels::default(),
1284            message_handlers: Default::default(),
1285            buffer: Mutex::new(VecDeque::new()),
1286            name,
1287            task: Mutex::new(Self::start_handling_messages(
1288                this.clone(),
1289                incoming_rx,
1290                &cx.to_async(),
1291            )),
1292            remote_started: Signal::new(cx),
1293            has_wsl_interop,
1294        })
1295    }
1296
1297    fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1298        self.remote_started.wait()
1299    }
1300
1301    fn start_handling_messages(
1302        this: Weak<Self>,
1303        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1304        cx: &AsyncApp,
1305    ) -> Task<Result<()>> {
1306        cx.spawn(async move |cx| {
1307            if let Some(this) = this.upgrade() {
1308                let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1309                this.outgoing_tx.lock().unbounded_send(envelope).ok();
1310            };
1311
1312            let peer_id = PeerId { owner_id: 0, id: 0 };
1313            while let Some(incoming) = incoming_rx.next().await {
1314                let Some(this) = this.upgrade() else {
1315                    return anyhow::Ok(());
1316                };
1317                if let Some(ack_id) = incoming.ack_id {
1318                    let mut buffer = this.buffer.lock();
1319                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1320                        buffer.pop_front();
1321                    }
1322                }
1323                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1324                {
1325                    log::debug!(
1326                        "{}:remote message received. name:FlushBufferedMessages",
1327                        this.name
1328                    );
1329                    {
1330                        let buffer = this.buffer.lock();
1331                        for envelope in buffer.iter() {
1332                            this.outgoing_tx
1333                                .lock()
1334                                .unbounded_send(envelope.clone())
1335                                .ok();
1336                        }
1337                    }
1338                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1339                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1340                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1341                    continue;
1342                }
1343
1344                if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1345                    this.remote_started.set(());
1346                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1347                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1348                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1349                    continue;
1350                }
1351
1352                this.max_received.store(incoming.id, SeqCst);
1353
1354                if let Some(request_id) = incoming.responding_to {
1355                    let request_id = MessageId(request_id);
1356                    let sender = this.response_channels.lock().remove(&request_id);
1357                    if let Some(sender) = sender {
1358                        let (tx, rx) = oneshot::channel();
1359                        if incoming.payload.is_some() {
1360                            sender.send((incoming, tx)).ok();
1361                        }
1362                        rx.await.ok();
1363                    }
1364                } else if let Some(envelope) =
1365                    build_typed_envelope(peer_id, Instant::now(), incoming)
1366                {
1367                    let type_name = envelope.payload_type_name();
1368                    let message_id = envelope.message_id();
1369                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1370                        &this.message_handlers,
1371                        envelope,
1372                        this.clone().into(),
1373                        cx.clone(),
1374                    ) {
1375                        log::debug!("{}:remote message received. name:{type_name}", this.name);
1376                        cx.foreground_executor()
1377                            .spawn(async move {
1378                                match future.await {
1379                                    Ok(_) => {
1380                                        log::debug!(
1381                                            "{}:remote message handled. name:{type_name}",
1382                                            this.name
1383                                        );
1384                                    }
1385                                    Err(error) => {
1386                                        log::error!(
1387                                            "{}:error handling message. type:{}, error:{:#}",
1388                                            this.name,
1389                                            type_name,
1390                                            format!("{error:#}").lines().fold(
1391                                                String::new(),
1392                                                |mut message, line| {
1393                                                    if !message.is_empty() {
1394                                                        message.push(' ');
1395                                                    }
1396                                                    message.push_str(line);
1397                                                    message
1398                                                }
1399                                            )
1400                                        );
1401                                    }
1402                                }
1403                            })
1404                            .detach()
1405                    } else {
1406                        log::error!("{}:unhandled remote message name:{type_name}", this.name);
1407                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1408                            message_id,
1409                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1410                        ) {
1411                            log::error!(
1412                                "{}:error sending error response for {type_name}:{e:#}",
1413                                this.name
1414                            );
1415                        }
1416                    }
1417                }
1418            }
1419            anyhow::Ok(())
1420        })
1421    }
1422
1423    fn reconnect(
1424        self: &Arc<Self>,
1425        incoming_rx: UnboundedReceiver<Envelope>,
1426        outgoing_tx: UnboundedSender<Envelope>,
1427        cx: &AsyncApp,
1428    ) {
1429        *self.outgoing_tx.lock() = outgoing_tx;
1430        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1431    }
1432
1433    fn request<T: RequestMessage>(
1434        &self,
1435        payload: T,
1436    ) -> impl 'static + Future<Output = Result<T::Response>> {
1437        self.request_internal(payload, true)
1438    }
1439
1440    fn request_internal<T: RequestMessage>(
1441        &self,
1442        payload: T,
1443        use_buffer: bool,
1444    ) -> impl 'static + Future<Output = Result<T::Response>> {
1445        log::debug!("remote request start. name:{}", T::NAME);
1446        let response =
1447            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1448        async move {
1449            let response = response.await?;
1450            log::debug!("remote request finish. name:{}", T::NAME);
1451            T::Response::from_envelope(response).context("received a response of the wrong type")
1452        }
1453    }
1454
1455    async fn resync(&self, timeout: Duration) -> Result<()> {
1456        smol::future::or(
1457            async {
1458                self.request_internal(proto::FlushBufferedMessages {}, false)
1459                    .await?;
1460
1461                for envelope in self.buffer.lock().iter() {
1462                    self.outgoing_tx
1463                        .lock()
1464                        .unbounded_send(envelope.clone())
1465                        .ok();
1466                }
1467                Ok(())
1468            },
1469            async {
1470                smol::Timer::after(timeout).await;
1471                anyhow::bail!("Timed out resyncing remote client")
1472            },
1473        )
1474        .await
1475    }
1476
1477    async fn ping(&self, timeout: Duration) -> Result<()> {
1478        smol::future::or(
1479            async {
1480                self.request(proto::Ping {}).await?;
1481                Ok(())
1482            },
1483            async {
1484                smol::Timer::after(timeout).await;
1485                anyhow::bail!("Timed out pinging remote client")
1486            },
1487        )
1488        .await
1489    }
1490
1491    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1492        log::debug!("remote send name:{}", T::NAME);
1493        self.send_dynamic(payload.into_envelope(0, None, None))
1494    }
1495
1496    fn request_dynamic(
1497        &self,
1498        mut envelope: proto::Envelope,
1499        type_name: &'static str,
1500        use_buffer: bool,
1501    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1502        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1503        let (tx, rx) = oneshot::channel();
1504        let mut response_channels_lock = self.response_channels.lock();
1505        response_channels_lock.insert(MessageId(envelope.id), tx);
1506        drop(response_channels_lock);
1507
1508        let result = if use_buffer {
1509            self.send_buffered(envelope)
1510        } else {
1511            self.send_unbuffered(envelope)
1512        };
1513        async move {
1514            if let Err(error) = &result {
1515                log::error!("failed to send message: {error}");
1516                anyhow::bail!("failed to send message: {error}");
1517            }
1518
1519            let response = rx.await.context("connection lost")?.0;
1520            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1521                return Err(RpcError::from_proto(error, type_name));
1522            }
1523            Ok(response)
1524        }
1525    }
1526
1527    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1528        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1529        self.send_buffered(envelope)
1530    }
1531
1532    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1533        envelope.ack_id = Some(self.max_received.load(SeqCst));
1534        self.buffer.lock().push_back(envelope.clone());
1535        // ignore errors on send (happen while we're reconnecting)
1536        // assume that the global "disconnected" overlay is sufficient.
1537        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1538        Ok(())
1539    }
1540
1541    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1542        envelope.ack_id = Some(self.max_received.load(SeqCst));
1543        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1544        Ok(())
1545    }
1546}
1547
1548impl ProtoClient for ChannelClient {
1549    fn request(
1550        &self,
1551        envelope: proto::Envelope,
1552        request_type: &'static str,
1553    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1554        self.request_dynamic(envelope, request_type, true).boxed()
1555    }
1556
1557    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1558        self.send_dynamic(envelope)
1559    }
1560
1561    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1562        self.send_dynamic(envelope)
1563    }
1564
1565    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1566        &self.message_handlers
1567    }
1568
1569    fn is_via_collab(&self) -> bool {
1570        false
1571    }
1572
1573    fn has_wsl_interop(&self) -> bool {
1574        self.has_wsl_interop
1575    }
1576}
1577
1578#[cfg(any(test, feature = "test-support"))]
1579mod fake {
1580    use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1581    use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1582    use anyhow::Result;
1583    use askpass::EncryptedPassword;
1584    use async_trait::async_trait;
1585    use collections::HashMap;
1586    use futures::{
1587        FutureExt, SinkExt, StreamExt,
1588        channel::{
1589            mpsc::{self, Sender},
1590            oneshot,
1591        },
1592        select_biased,
1593    };
1594    use gpui::{App, AppContext as _, AsyncApp, Task, TestAppContext};
1595    use release_channel::ReleaseChannel;
1596    use rpc::proto::Envelope;
1597    use semver::Version;
1598    use std::{path::PathBuf, sync::Arc};
1599    use util::paths::{PathStyle, RemotePathBuf};
1600
1601    pub(super) struct FakeRemoteConnection {
1602        pub(super) connection_options: RemoteConnectionOptions,
1603        pub(super) server_channel: Arc<ChannelClient>,
1604        pub(super) server_cx: SendableCx,
1605    }
1606
1607    pub(super) struct SendableCx(AsyncApp);
1608    impl SendableCx {
1609        // SAFETY: When run in test mode, GPUI is always single threaded.
1610        pub(super) fn new(cx: &TestAppContext) -> Self {
1611            Self(cx.to_async())
1612        }
1613
1614        // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1615        fn get(&self, _: &AsyncApp) -> AsyncApp {
1616            self.0.clone()
1617        }
1618    }
1619
1620    // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1621    unsafe impl Send for SendableCx {}
1622    unsafe impl Sync for SendableCx {}
1623
1624    #[async_trait(?Send)]
1625    impl RemoteConnection for FakeRemoteConnection {
1626        async fn kill(&self) -> Result<()> {
1627            Ok(())
1628        }
1629
1630        fn has_been_killed(&self) -> bool {
1631            false
1632        }
1633
1634        fn build_command(
1635            &self,
1636            program: Option<String>,
1637            args: &[String],
1638            env: &HashMap<String, String>,
1639            _: Option<String>,
1640            _: Option<(u16, String, u16)>,
1641        ) -> Result<CommandTemplate> {
1642            let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1643            let mut ssh_args = Vec::new();
1644            ssh_args.push(ssh_program);
1645            ssh_args.extend(args.iter().cloned());
1646            Ok(CommandTemplate {
1647                program: "ssh".into(),
1648                args: ssh_args,
1649                env: env.clone(),
1650            })
1651        }
1652
1653        fn build_forward_ports_command(
1654            &self,
1655            forwards: Vec<(u16, String, u16)>,
1656        ) -> anyhow::Result<CommandTemplate> {
1657            Ok(CommandTemplate {
1658                program: "ssh".into(),
1659                args: std::iter::once("-N".to_owned())
1660                    .chain(forwards.into_iter().map(|(local_port, host, remote_port)| {
1661                        format!("{local_port}:{host}:{remote_port}")
1662                    }))
1663                    .collect(),
1664                env: Default::default(),
1665            })
1666        }
1667
1668        fn upload_directory(
1669            &self,
1670            _src_path: PathBuf,
1671            _dest_path: RemotePathBuf,
1672            _cx: &App,
1673        ) -> Task<Result<()>> {
1674            unreachable!()
1675        }
1676
1677        fn connection_options(&self) -> RemoteConnectionOptions {
1678            self.connection_options.clone()
1679        }
1680
1681        fn simulate_disconnect(&self, cx: &AsyncApp) {
1682            let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1683            let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1684            self.server_channel
1685                .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1686        }
1687
1688        fn start_proxy(
1689            &self,
1690            _unique_identifier: String,
1691            _reconnect: bool,
1692            mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1693            mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1694            mut connection_activity_tx: Sender<()>,
1695            _delegate: Arc<dyn RemoteClientDelegate>,
1696            cx: &mut AsyncApp,
1697        ) -> Task<Result<i32>> {
1698            let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1699            let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1700
1701            self.server_channel.reconnect(
1702                server_incoming_rx,
1703                server_outgoing_tx,
1704                &self.server_cx.get(cx),
1705            );
1706
1707            cx.background_spawn(async move {
1708                loop {
1709                    select_biased! {
1710                        server_to_client = server_outgoing_rx.next().fuse() => {
1711                            let Some(server_to_client) = server_to_client else {
1712                                return Ok(1)
1713                            };
1714                            connection_activity_tx.try_send(()).ok();
1715                            client_incoming_tx.send(server_to_client).await.ok();
1716                        }
1717                        client_to_server = client_outgoing_rx.next().fuse() => {
1718                            let Some(client_to_server) = client_to_server else {
1719                                return Ok(1)
1720                            };
1721                            server_incoming_tx.send(client_to_server).await.ok();
1722                        }
1723                    }
1724                }
1725            })
1726        }
1727
1728        fn path_style(&self) -> PathStyle {
1729            PathStyle::local()
1730        }
1731
1732        fn shell(&self) -> String {
1733            "sh".to_owned()
1734        }
1735
1736        fn default_system_shell(&self) -> String {
1737            "sh".to_owned()
1738        }
1739
1740        fn has_wsl_interop(&self) -> bool {
1741            false
1742        }
1743    }
1744
1745    pub(super) struct Delegate;
1746
1747    impl RemoteClientDelegate for Delegate {
1748        fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1749            unreachable!()
1750        }
1751
1752        fn download_server_binary_locally(
1753            &self,
1754            _: RemotePlatform,
1755            _: ReleaseChannel,
1756            _: Option<Version>,
1757            _: &mut AsyncApp,
1758        ) -> Task<Result<PathBuf>> {
1759            unreachable!()
1760        }
1761
1762        fn get_download_url(
1763            &self,
1764            _platform: RemotePlatform,
1765            _release_channel: ReleaseChannel,
1766            _version: Option<Version>,
1767            _cx: &mut AsyncApp,
1768        ) -> Task<Result<Option<String>>> {
1769            unreachable!()
1770        }
1771
1772        fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1773    }
1774}