remote_client.rs

   1#[cfg(any(test, feature = "test-support"))]
   2use crate::transport::mock::ConnectGuard;
   3use crate::{
   4    SshConnectionOptions,
   5    protocol::MessageId,
   6    proxy::ProxyLaunchError,
   7    transport::{
   8        docker::{DockerConnectionOptions, DockerExecConnection},
   9        ssh::SshRemoteConnection,
  10        wsl::{WslConnectionOptions, WslRemoteConnection},
  11    },
  12};
  13use anyhow::{Context as _, Result, anyhow};
  14use askpass::EncryptedPassword;
  15use async_trait::async_trait;
  16use collections::HashMap;
  17use futures::{
  18    Future, FutureExt as _, StreamExt as _,
  19    channel::{
  20        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  21        oneshot,
  22    },
  23    future::{BoxFuture, Shared, WeakShared},
  24    select, select_biased,
  25};
  26use gpui::{
  27    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  28    EventEmitter, FutureExt, Global, Task, WeakEntity,
  29};
  30use parking_lot::Mutex;
  31
  32use release_channel::ReleaseChannel;
  33use rpc::{
  34    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  35    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  36};
  37use semver::Version;
  38use std::{
  39    collections::VecDeque,
  40    fmt,
  41    ops::ControlFlow,
  42    path::PathBuf,
  43    sync::{
  44        Arc, Weak,
  45        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  46    },
  47    time::{Duration, Instant},
  48};
  49use util::{
  50    ResultExt,
  51    paths::{PathStyle, RemotePathBuf},
  52};
  53
  54#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  55pub enum RemoteOs {
  56    Linux,
  57    MacOs,
  58    Windows,
  59}
  60
  61impl RemoteOs {
  62    pub fn as_str(&self) -> &'static str {
  63        match self {
  64            RemoteOs::Linux => "linux",
  65            RemoteOs::MacOs => "macos",
  66            RemoteOs::Windows => "windows",
  67        }
  68    }
  69
  70    pub fn is_windows(&self) -> bool {
  71        matches!(self, RemoteOs::Windows)
  72    }
  73}
  74
  75impl std::fmt::Display for RemoteOs {
  76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  77        f.write_str(self.as_str())
  78    }
  79}
  80
  81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  82pub enum RemoteArch {
  83    X86_64,
  84    Aarch64,
  85}
  86
  87impl RemoteArch {
  88    pub fn as_str(&self) -> &'static str {
  89        match self {
  90            RemoteArch::X86_64 => "x86_64",
  91            RemoteArch::Aarch64 => "aarch64",
  92        }
  93    }
  94}
  95
  96impl std::fmt::Display for RemoteArch {
  97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  98        f.write_str(self.as_str())
  99    }
 100}
 101
 102#[derive(Copy, Clone, Debug)]
 103pub struct RemotePlatform {
 104    pub os: RemoteOs,
 105    pub arch: RemoteArch,
 106}
 107
 108#[derive(Clone, Debug)]
 109pub struct CommandTemplate {
 110    pub program: String,
 111    pub args: Vec<String>,
 112    pub env: HashMap<String, String>,
 113}
 114
 115/// Whether a command should be run with TTY allocation for interactive use.
 116#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 117pub enum Interactive {
 118    /// Allocate a pseudo-TTY for interactive terminal use.
 119    Yes,
 120    /// Do not allocate a TTY - for commands that communicate via piped stdio.
 121    No,
 122}
 123
 124pub trait RemoteClientDelegate: Send + Sync {
 125    fn ask_password(
 126        &self,
 127        prompt: String,
 128        tx: oneshot::Sender<EncryptedPassword>,
 129        cx: &mut AsyncApp,
 130    );
 131    fn get_download_url(
 132        &self,
 133        platform: RemotePlatform,
 134        release_channel: ReleaseChannel,
 135        version: Option<Version>,
 136        cx: &mut AsyncApp,
 137    ) -> Task<Result<Option<String>>>;
 138    fn download_server_binary_locally(
 139        &self,
 140        platform: RemotePlatform,
 141        release_channel: ReleaseChannel,
 142        version: Option<Version>,
 143        cx: &mut AsyncApp,
 144    ) -> Task<Result<PathBuf>>;
 145    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
 146}
 147
 148const MAX_MISSED_HEARTBEATS: usize = 5;
 149const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
 150const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
 151const INITIAL_CONNECTION_TIMEOUT: Duration =
 152    Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
 153
 154pub const MAX_RECONNECT_ATTEMPTS: usize = 3;
 155
 156enum State {
 157    Connecting,
 158    Connected {
 159        remote_connection: Arc<dyn RemoteConnection>,
 160        delegate: Arc<dyn RemoteClientDelegate>,
 161
 162        multiplex_task: Task<Result<()>>,
 163        heartbeat_task: Task<Result<()>>,
 164    },
 165    HeartbeatMissed {
 166        missed_heartbeats: usize,
 167
 168        remote_connection: Arc<dyn RemoteConnection>,
 169        delegate: Arc<dyn RemoteClientDelegate>,
 170
 171        multiplex_task: Task<Result<()>>,
 172        heartbeat_task: Task<Result<()>>,
 173    },
 174    Reconnecting,
 175    ReconnectFailed {
 176        remote_connection: Arc<dyn RemoteConnection>,
 177        delegate: Arc<dyn RemoteClientDelegate>,
 178
 179        error: anyhow::Error,
 180        attempts: usize,
 181    },
 182    ReconnectExhausted,
 183    ServerNotRunning,
 184}
 185
 186impl fmt::Display for State {
 187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 188        match self {
 189            Self::Connecting => write!(f, "connecting"),
 190            Self::Connected { .. } => write!(f, "connected"),
 191            Self::Reconnecting => write!(f, "reconnecting"),
 192            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 193            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 194            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 195            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 196        }
 197    }
 198}
 199
 200impl State {
 201    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 202        match self {
 203            Self::Connected {
 204                remote_connection, ..
 205            } => Some(remote_connection.clone()),
 206            Self::HeartbeatMissed {
 207                remote_connection, ..
 208            } => Some(remote_connection.clone()),
 209            Self::ReconnectFailed {
 210                remote_connection, ..
 211            } => Some(remote_connection.clone()),
 212            _ => None,
 213        }
 214    }
 215
 216    fn can_reconnect(&self) -> bool {
 217        match self {
 218            Self::Connected { .. }
 219            | Self::HeartbeatMissed { .. }
 220            | Self::ReconnectFailed { .. } => true,
 221            State::Connecting
 222            | State::Reconnecting
 223            | State::ReconnectExhausted
 224            | State::ServerNotRunning => false,
 225        }
 226    }
 227
 228    fn is_reconnect_failed(&self) -> bool {
 229        matches!(self, Self::ReconnectFailed { .. })
 230    }
 231
 232    fn is_reconnect_exhausted(&self) -> bool {
 233        matches!(self, Self::ReconnectExhausted { .. })
 234    }
 235
 236    fn is_server_not_running(&self) -> bool {
 237        matches!(self, Self::ServerNotRunning)
 238    }
 239
 240    fn is_reconnecting(&self) -> bool {
 241        matches!(self, Self::Reconnecting { .. })
 242    }
 243
 244    fn heartbeat_recovered(self) -> Self {
 245        match self {
 246            Self::HeartbeatMissed {
 247                remote_connection,
 248                delegate,
 249                multiplex_task,
 250                heartbeat_task,
 251                ..
 252            } => Self::Connected {
 253                remote_connection,
 254                delegate,
 255                multiplex_task,
 256                heartbeat_task,
 257            },
 258            _ => self,
 259        }
 260    }
 261
 262    fn heartbeat_missed(self) -> Self {
 263        match self {
 264            Self::Connected {
 265                remote_connection,
 266                delegate,
 267                multiplex_task,
 268                heartbeat_task,
 269            } => Self::HeartbeatMissed {
 270                missed_heartbeats: 1,
 271                remote_connection,
 272                delegate,
 273                multiplex_task,
 274                heartbeat_task,
 275            },
 276            Self::HeartbeatMissed {
 277                missed_heartbeats,
 278                remote_connection,
 279                delegate,
 280                multiplex_task,
 281                heartbeat_task,
 282            } => Self::HeartbeatMissed {
 283                missed_heartbeats: missed_heartbeats + 1,
 284                remote_connection,
 285                delegate,
 286                multiplex_task,
 287                heartbeat_task,
 288            },
 289            _ => self,
 290        }
 291    }
 292}
 293
 294/// The state of the ssh connection.
 295#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 296pub enum ConnectionState {
 297    Connecting,
 298    Connected,
 299    HeartbeatMissed,
 300    Reconnecting,
 301    Disconnected,
 302}
 303
 304impl From<&State> for ConnectionState {
 305    fn from(value: &State) -> Self {
 306        match value {
 307            State::Connecting => Self::Connecting,
 308            State::Connected { .. } => Self::Connected,
 309            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 310            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 311            State::ReconnectExhausted => Self::Disconnected,
 312            State::ServerNotRunning => Self::Disconnected,
 313        }
 314    }
 315}
 316
 317pub struct RemoteClient {
 318    client: Arc<ChannelClient>,
 319    unique_identifier: String,
 320    connection_options: RemoteConnectionOptions,
 321    path_style: PathStyle,
 322    state: Option<State>,
 323}
 324
 325#[derive(Debug)]
 326pub enum RemoteClientEvent {
 327    Disconnected { server_not_running: bool },
 328}
 329
 330impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 331
 332/// Identifies the socket on the remote server so that reconnects
 333/// can re-join the same project.
 334pub enum ConnectionIdentifier {
 335    Setup(u64),
 336    Workspace(i64),
 337}
 338
 339static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 340
 341impl ConnectionIdentifier {
 342    pub fn setup() -> Self {
 343        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 344    }
 345
 346    // This string gets used in a socket name, and so must be relatively short.
 347    // The total length of:
 348    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 349    // Must be less than about 100 characters
 350    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 351    // So our strings should be at most 20 characters or so.
 352    fn to_string(&self, cx: &App) -> String {
 353        let identifier_prefix = match ReleaseChannel::global(cx) {
 354            ReleaseChannel::Stable => "".to_string(),
 355            release_channel => format!("{}-", release_channel.dev_name()),
 356        };
 357        match self {
 358            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 359            Self::Workspace(workspace_id) => {
 360                format!("{identifier_prefix}workspace-{workspace_id}",)
 361            }
 362        }
 363    }
 364}
 365
 366pub async fn connect(
 367    connection_options: RemoteConnectionOptions,
 368    delegate: Arc<dyn RemoteClientDelegate>,
 369    cx: &mut AsyncApp,
 370) -> Result<Arc<dyn RemoteConnection>> {
 371    cx.update(|cx| {
 372        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 373            pool.connect(connection_options.clone(), delegate.clone(), cx)
 374        })
 375    })
 376    .await
 377    .map_err(|e| e.cloned())
 378}
 379
 380/// Returns `true` if the global [`ConnectionPool`] already has a live
 381/// connection for the given options. Callers can use this to decide
 382/// whether to show interactive UI (e.g., a password modal) before
 383/// connecting.
 384pub fn has_active_connection(opts: &RemoteConnectionOptions, cx: &App) -> bool {
 385    cx.try_global::<ConnectionPool>().is_some_and(|pool| {
 386        matches!(
 387            pool.connections.get(opts),
 388            Some(ConnectionPoolEntry::Connected(remote))
 389                if remote.upgrade().is_some_and(|r| !r.has_been_killed())
 390        )
 391    })
 392}
 393
 394impl RemoteClient {
 395    pub fn new(
 396        unique_identifier: ConnectionIdentifier,
 397        remote_connection: Arc<dyn RemoteConnection>,
 398        cancellation: oneshot::Receiver<()>,
 399        delegate: Arc<dyn RemoteClientDelegate>,
 400        cx: &mut App,
 401    ) -> Task<Result<Option<Entity<Self>>>> {
 402        let unique_identifier = unique_identifier.to_string(cx);
 403        cx.spawn(async move |cx| {
 404            let success = Box::pin(async move {
 405                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 406                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 407                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 408
 409                let client = cx.update(|cx| {
 410                    ChannelClient::new(
 411                        incoming_rx,
 412                        outgoing_tx,
 413                        cx,
 414                        "client",
 415                        remote_connection.has_wsl_interop(),
 416                    )
 417                });
 418
 419                let path_style = remote_connection.path_style();
 420                let this = cx.new(|_| Self {
 421                    client: client.clone(),
 422                    unique_identifier: unique_identifier.clone(),
 423                    connection_options: remote_connection.connection_options(),
 424                    path_style,
 425                    state: Some(State::Connecting),
 426                });
 427
 428                let io_task = remote_connection.start_proxy(
 429                    unique_identifier,
 430                    false,
 431                    incoming_tx,
 432                    outgoing_rx,
 433                    connection_activity_tx,
 434                    delegate.clone(),
 435                    cx,
 436                );
 437
 438                let ready = client
 439                    .wait_for_remote_started()
 440                    .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
 441                    .await;
 442                match ready {
 443                    Ok(Some(_)) => {}
 444                    Ok(None) => {
 445                        let mut error = "remote client exited before becoming ready".to_owned();
 446                        if let Some(status) = io_task.now_or_never() {
 447                            match status {
 448                                Ok(exit_code) => {
 449                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 450                                }
 451                                Err(e) => error.push_str(&format!(", error={e:?}")),
 452                            }
 453                        }
 454                        let error = anyhow::anyhow!("{error}");
 455                        log::error!("failed to establish connection: {}", error);
 456                        return Err(error);
 457                    }
 458                    Err(_) => {
 459                        let mut error = String::new();
 460                        if let Some(status) = io_task.now_or_never() {
 461                            error.push_str("Client exited with ");
 462                            match status {
 463                                Ok(exit_code) => {
 464                                    error.push_str(&format!("exit_code {exit_code:?}"))
 465                                }
 466                                Err(e) => error.push_str(&format!("error {e:?}")),
 467                            }
 468                        } else {
 469                            error.push_str("client did not become ready within the timeout");
 470                        }
 471                        let error = anyhow::anyhow!("{error}");
 472                        log::error!("failed to establish connection: {error}");
 473                        return Err(error);
 474                    }
 475                }
 476                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 477                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 478                    log::error!("failed to establish connection: {}", error);
 479                    return Err(error);
 480                }
 481
 482                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 483
 484                this.update(cx, |this, _| {
 485                    this.state = Some(State::Connected {
 486                        remote_connection,
 487                        delegate,
 488                        multiplex_task,
 489                        heartbeat_task,
 490                    });
 491                });
 492
 493                Ok(Some(this))
 494            });
 495
 496            select! {
 497                _ = cancellation.fuse() => {
 498                    Ok(None)
 499                }
 500                result = success.fuse() =>  result
 501            }
 502        })
 503    }
 504
 505    pub fn proto_client_from_channels(
 506        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 507        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 508        cx: &App,
 509        name: &'static str,
 510        has_wsl_interop: bool,
 511    ) -> AnyProtoClient {
 512        ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
 513    }
 514
 515    pub fn shutdown_processes<T: RequestMessage>(
 516        &mut self,
 517        shutdown_request: Option<T>,
 518        executor: BackgroundExecutor,
 519    ) -> Option<impl Future<Output = ()> + use<T>> {
 520        let state = self.state.take()?;
 521        log::info!("shutting down remote processes");
 522
 523        let State::Connected {
 524            multiplex_task,
 525            heartbeat_task,
 526            remote_connection,
 527            delegate,
 528        } = state
 529        else {
 530            return None;
 531        };
 532
 533        let client = self.client.clone();
 534
 535        Some(async move {
 536            if let Some(shutdown_request) = shutdown_request {
 537                client.send(shutdown_request).log_err();
 538                // We wait 50ms instead of waiting for a response, because
 539                // waiting for a response would require us to wait on the main thread
 540                // which we want to avoid in an `on_app_quit` callback.
 541                executor.timer(Duration::from_millis(50)).await;
 542            }
 543
 544            // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
 545            // child of master_process.
 546            drop(multiplex_task);
 547            // Now drop the rest of state, which kills master process.
 548            drop(heartbeat_task);
 549            drop(remote_connection);
 550            drop(delegate);
 551        })
 552    }
 553
 554    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 555        let can_reconnect = self
 556            .state
 557            .as_ref()
 558            .map(|state| state.can_reconnect())
 559            .unwrap_or(false);
 560        if !can_reconnect {
 561            let state = if let Some(state) = self.state.as_ref() {
 562                state.to_string()
 563            } else {
 564                "no state set".to_string()
 565            };
 566            log::info!(
 567                "aborting reconnect, because not in state that allows reconnecting: {state}"
 568            );
 569            anyhow::bail!(
 570                "aborting reconnect, because not in state that allows reconnecting: {state}"
 571            );
 572        }
 573
 574        let state = self.state.take().unwrap();
 575        let (attempts, remote_connection, delegate) = match state {
 576            State::Connected {
 577                remote_connection,
 578                delegate,
 579                multiplex_task,
 580                heartbeat_task,
 581            }
 582            | State::HeartbeatMissed {
 583                remote_connection,
 584                delegate,
 585                multiplex_task,
 586                heartbeat_task,
 587                ..
 588            } => {
 589                drop(multiplex_task);
 590                drop(heartbeat_task);
 591                (0, remote_connection, delegate)
 592            }
 593            State::ReconnectFailed {
 594                attempts,
 595                remote_connection,
 596                delegate,
 597                ..
 598            } => (attempts, remote_connection, delegate),
 599            State::Connecting
 600            | State::Reconnecting
 601            | State::ReconnectExhausted
 602            | State::ServerNotRunning => unreachable!(),
 603        };
 604
 605        let attempts = attempts + 1;
 606        if attempts > MAX_RECONNECT_ATTEMPTS {
 607            log::error!(
 608                "Failed to reconnect to after {} attempts, giving up",
 609                MAX_RECONNECT_ATTEMPTS
 610            );
 611            self.set_state(State::ReconnectExhausted, cx);
 612            return Ok(());
 613        }
 614
 615        self.set_state(State::Reconnecting, cx);
 616
 617        log::info!(
 618            "Trying to reconnect to remote server... Attempt {}",
 619            attempts
 620        );
 621
 622        let unique_identifier = self.unique_identifier.clone();
 623        let client = self.client.clone();
 624        let reconnect_task = cx.spawn(async move |this, cx| {
 625            macro_rules! failed {
 626                ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
 627                    delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
 628                    return State::ReconnectFailed {
 629                        error: anyhow!($error),
 630                        attempts: $attempts,
 631                        remote_connection: $remote_connection,
 632                        delegate: $delegate,
 633                    };
 634                };
 635            }
 636
 637            if let Err(error) = remote_connection
 638                .kill()
 639                .await
 640                .context("Failed to kill remote_connection process")
 641            {
 642                failed!(error, attempts, remote_connection, delegate);
 643            };
 644
 645            let connection_options = remote_connection.connection_options();
 646
 647            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 648            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 649            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 650
 651            let (remote_connection, io_task) = match async {
 652                let remote_connection = cx
 653                    .update_global(|pool: &mut ConnectionPool, cx| {
 654                        pool.connect(connection_options, delegate.clone(), cx)
 655                    })
 656                    .await
 657                    .map_err(|error| error.cloned())?;
 658
 659                let io_task = remote_connection.start_proxy(
 660                    unique_identifier,
 661                    true,
 662                    incoming_tx,
 663                    outgoing_rx,
 664                    connection_activity_tx,
 665                    delegate.clone(),
 666                    cx,
 667                );
 668                anyhow::Ok((remote_connection, io_task))
 669            }
 670            .await
 671            {
 672                Ok((remote_connection, io_task)) => (remote_connection, io_task),
 673                Err(error) => {
 674                    failed!(error, attempts, remote_connection, delegate);
 675                }
 676            };
 677
 678            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 679            client.reconnect(incoming_rx, outgoing_tx, cx);
 680
 681            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 682                failed!(error, attempts, remote_connection, delegate);
 683            };
 684
 685            State::Connected {
 686                remote_connection,
 687                delegate,
 688                multiplex_task,
 689                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 690            }
 691        });
 692
 693        cx.spawn(async move |this, cx| {
 694            let new_state = reconnect_task.await;
 695            this.update(cx, |this, cx| {
 696                this.try_set_state(cx, |old_state| {
 697                    if old_state.is_reconnecting() {
 698                        match &new_state {
 699                            State::Connecting
 700                            | State::Reconnecting
 701                            | State::HeartbeatMissed { .. }
 702                            | State::ServerNotRunning => {}
 703                            State::Connected { .. } => {
 704                                log::info!("Successfully reconnected");
 705                            }
 706                            State::ReconnectFailed {
 707                                error, attempts, ..
 708                            } => {
 709                                log::error!(
 710                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 711                                    attempts,
 712                                    error
 713                                );
 714                            }
 715                            State::ReconnectExhausted => {
 716                                log::error!("Reconnect attempt failed and all attempts exhausted");
 717                            }
 718                        }
 719                        Some(new_state)
 720                    } else {
 721                        None
 722                    }
 723                });
 724
 725                if this.state_is(State::is_reconnect_failed) {
 726                    this.reconnect(cx)
 727                } else if this.state_is(State::is_reconnect_exhausted) {
 728                    Ok(())
 729                } else {
 730                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 731                    Ok(())
 732                }
 733            })
 734        })
 735        .detach_and_log_err(cx);
 736
 737        Ok(())
 738    }
 739
 740    fn heartbeat(
 741        this: WeakEntity<Self>,
 742        mut connection_activity_rx: mpsc::Receiver<()>,
 743        cx: &mut AsyncApp,
 744    ) -> Task<Result<()>> {
 745        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 746            return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
 747        };
 748
 749        cx.spawn(async move |cx| {
 750            let mut missed_heartbeats = 0;
 751
 752            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 753            futures::pin_mut!(keepalive_timer);
 754
 755            loop {
 756                select_biased! {
 757                    result = connection_activity_rx.next().fuse() => {
 758                        if result.is_none() {
 759                            log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
 760                            return Ok(());
 761                        }
 762
 763                        if missed_heartbeats != 0 {
 764                            missed_heartbeats = 0;
 765                            let _ =this.update(cx, |this, cx| {
 766                                this.handle_heartbeat_result(missed_heartbeats, cx)
 767                            })?;
 768                        }
 769                    }
 770                    _ = keepalive_timer => {
 771                        log::debug!("Sending heartbeat to server...");
 772
 773                        let result = select_biased! {
 774                            _ = connection_activity_rx.next().fuse() => {
 775                                Ok(())
 776                            }
 777                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 778                                ping_result
 779                            }
 780                        };
 781
 782                        if result.is_err() {
 783                            missed_heartbeats += 1;
 784                            log::warn!(
 785                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 786                                HEARTBEAT_TIMEOUT,
 787                                missed_heartbeats,
 788                                MAX_MISSED_HEARTBEATS
 789                            );
 790                        } else if missed_heartbeats != 0 {
 791                            missed_heartbeats = 0;
 792                        } else {
 793                            continue;
 794                        }
 795
 796                        let result = this.update(cx, |this, cx| {
 797                            this.handle_heartbeat_result(missed_heartbeats, cx)
 798                        })?;
 799                        if result.is_break() {
 800                            return Ok(());
 801                        }
 802                    }
 803                }
 804
 805                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 806            }
 807        })
 808    }
 809
 810    fn handle_heartbeat_result(
 811        &mut self,
 812        missed_heartbeats: usize,
 813        cx: &mut Context<Self>,
 814    ) -> ControlFlow<()> {
 815        let state = self.state.take().unwrap();
 816        let next_state = if missed_heartbeats > 0 {
 817            state.heartbeat_missed()
 818        } else {
 819            state.heartbeat_recovered()
 820        };
 821
 822        self.set_state(next_state, cx);
 823
 824        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 825            log::error!(
 826                "Missed last {} heartbeats. Reconnecting...",
 827                missed_heartbeats
 828            );
 829
 830            self.reconnect(cx)
 831                .context("failed to start reconnect process after missing heartbeats")
 832                .log_err();
 833            ControlFlow::Break(())
 834        } else {
 835            ControlFlow::Continue(())
 836        }
 837    }
 838
 839    fn monitor(
 840        this: WeakEntity<Self>,
 841        io_task: Task<Result<i32>>,
 842        cx: &AsyncApp,
 843    ) -> Task<Result<()>> {
 844        cx.spawn(async move |cx| {
 845            let result = io_task.await;
 846
 847            match result {
 848                Ok(exit_code) => {
 849                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 850                        match error {
 851                            ProxyLaunchError::ServerNotRunning => {
 852                                log::error!("failed to reconnect because server is not running");
 853                                this.update(cx, |this, cx| {
 854                                    this.set_state(State::ServerNotRunning, cx);
 855                                })?;
 856                            }
 857                        }
 858                    } else {
 859                        log::error!("proxy process terminated unexpectedly: {exit_code}");
 860                        this.update(cx, |this, cx| {
 861                            this.reconnect(cx).ok();
 862                        })?;
 863                    }
 864                }
 865                Err(error) => {
 866                    log::warn!(
 867                        "remote io task died with error: {:?}. reconnecting...",
 868                        error
 869                    );
 870                    this.update(cx, |this, cx| {
 871                        this.reconnect(cx).ok();
 872                    })?;
 873                }
 874            }
 875
 876            Ok(())
 877        })
 878    }
 879
 880    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 881        self.state.as_ref().is_some_and(check)
 882    }
 883
 884    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 885        let new_state = self.state.as_ref().and_then(map);
 886        if let Some(new_state) = new_state {
 887            self.state.replace(new_state);
 888            cx.notify();
 889        }
 890    }
 891
 892    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 893        log::info!("setting state to '{}'", &state);
 894
 895        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 896        let is_server_not_running = state.is_server_not_running();
 897        self.state.replace(state);
 898
 899        if is_reconnect_exhausted || is_server_not_running {
 900            cx.emit(RemoteClientEvent::Disconnected {
 901                server_not_running: is_server_not_running,
 902            });
 903        }
 904        cx.notify();
 905    }
 906
 907    pub fn shell(&self) -> Option<String> {
 908        Some(self.remote_connection()?.shell())
 909    }
 910
 911    pub fn default_system_shell(&self) -> Option<String> {
 912        Some(self.remote_connection()?.default_system_shell())
 913    }
 914
 915    pub fn shares_network_interface(&self) -> bool {
 916        self.remote_connection()
 917            .map_or(false, |connection| connection.shares_network_interface())
 918    }
 919
 920    pub fn has_wsl_interop(&self) -> bool {
 921        self.remote_connection()
 922            .map_or(false, |connection| connection.has_wsl_interop())
 923    }
 924
 925    pub fn build_command(
 926        &self,
 927        program: Option<String>,
 928        args: &[String],
 929        env: &HashMap<String, String>,
 930        working_dir: Option<String>,
 931        port_forward: Option<(u16, String, u16)>,
 932    ) -> Result<CommandTemplate> {
 933        self.build_command_with_options(
 934            program,
 935            args,
 936            env,
 937            working_dir,
 938            port_forward,
 939            Interactive::Yes,
 940        )
 941    }
 942
 943    pub fn build_command_with_options(
 944        &self,
 945        program: Option<String>,
 946        args: &[String],
 947        env: &HashMap<String, String>,
 948        working_dir: Option<String>,
 949        port_forward: Option<(u16, String, u16)>,
 950        interactive: Interactive,
 951    ) -> Result<CommandTemplate> {
 952        let Some(connection) = self.remote_connection() else {
 953            return Err(anyhow!("no remote connection"));
 954        };
 955        connection.build_command(program, args, env, working_dir, port_forward, interactive)
 956    }
 957
 958    pub fn build_forward_ports_command(
 959        &self,
 960        forwards: Vec<(u16, String, u16)>,
 961    ) -> Result<CommandTemplate> {
 962        let Some(connection) = self.remote_connection() else {
 963            return Err(anyhow!("no remote connection"));
 964        };
 965        connection.build_forward_ports_command(forwards)
 966    }
 967
 968    pub fn upload_directory(
 969        &self,
 970        src_path: PathBuf,
 971        dest_path: RemotePathBuf,
 972        cx: &App,
 973    ) -> Task<Result<()>> {
 974        let Some(connection) = self.remote_connection() else {
 975            return Task::ready(Err(anyhow!("no remote connection")));
 976        };
 977        connection.upload_directory(src_path, dest_path, cx)
 978    }
 979
 980    pub fn proto_client(&self) -> AnyProtoClient {
 981        self.client.clone().into()
 982    }
 983
 984    pub fn connection_options(&self) -> RemoteConnectionOptions {
 985        self.connection_options.clone()
 986    }
 987
 988    pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 989        if let State::Connected {
 990            remote_connection, ..
 991        } = self.state.as_ref()?
 992        {
 993            Some(remote_connection.clone())
 994        } else {
 995            None
 996        }
 997    }
 998
 999    pub fn connection_state(&self) -> ConnectionState {
1000        self.state
1001            .as_ref()
1002            .map(ConnectionState::from)
1003            .unwrap_or(ConnectionState::Disconnected)
1004    }
1005
1006    pub fn is_disconnected(&self) -> bool {
1007        self.connection_state() == ConnectionState::Disconnected
1008    }
1009
1010    pub fn path_style(&self) -> PathStyle {
1011        self.path_style
1012    }
1013
1014    /// Forcibly disconnects from the remote server by killing the underlying connection.
1015    /// This will trigger the reconnection logic if reconnection attempts remain.
1016    /// Useful for testing reconnection behavior in real environments.
1017    pub fn force_disconnect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1018        let Some(connection) = self.remote_connection() else {
1019            return Task::ready(Err(anyhow!("no active remote connection to disconnect")));
1020        };
1021
1022        log::info!("force_disconnect: killing remote connection");
1023
1024        cx.spawn(async move |_, _| {
1025            connection.kill().await?;
1026            Ok(())
1027        })
1028    }
1029
1030    /// Simulates a timeout by pausing heartbeat responses.
1031    /// This will cause heartbeat failures and eventually trigger reconnection
1032    /// after MAX_MISSED_HEARTBEATS are missed.
1033    /// Useful for testing timeout behavior in real environments.
1034    pub fn force_heartbeat_timeout(&mut self, attempts: usize, cx: &mut Context<Self>) {
1035        log::info!("force_heartbeat_timeout: triggering heartbeat failure state");
1036
1037        if let Some(State::Connected {
1038            remote_connection,
1039            delegate,
1040            multiplex_task,
1041            heartbeat_task,
1042        }) = self.state.take()
1043        {
1044            self.set_state(
1045                if attempts == 0 {
1046                    State::HeartbeatMissed {
1047                        missed_heartbeats: MAX_MISSED_HEARTBEATS,
1048                        remote_connection,
1049                        delegate,
1050                        multiplex_task,
1051                        heartbeat_task,
1052                    }
1053                } else {
1054                    State::ReconnectFailed {
1055                        remote_connection,
1056                        delegate,
1057                        error: anyhow!("forced heartbeat timeout"),
1058                        attempts,
1059                    }
1060                },
1061                cx,
1062            );
1063
1064            self.reconnect(cx)
1065                .context("failed to start reconnect after forced timeout")
1066                .log_err();
1067        } else {
1068            log::warn!("force_heartbeat_timeout: not in Connected state, ignoring");
1069        }
1070    }
1071
1072    #[cfg(any(test, feature = "test-support"))]
1073    pub fn force_server_not_running(&mut self, cx: &mut Context<Self>) {
1074        self.set_state(State::ServerNotRunning, cx);
1075    }
1076
1077    #[cfg(any(test, feature = "test-support"))]
1078    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
1079        let opts = self.connection_options();
1080        client_cx.spawn(async move |cx| {
1081            let connection = cx.update_global(|c: &mut ConnectionPool, _| {
1082                if let Some(ConnectionPoolEntry::Connected(c)) = c.connections.get(&opts) {
1083                    if let Some(connection) = c.upgrade() {
1084                        connection
1085                    } else {
1086                        panic!("connection was dropped")
1087                    }
1088                } else {
1089                    panic!("missing test connection")
1090                }
1091            });
1092
1093            connection.simulate_disconnect(cx);
1094        })
1095    }
1096
1097    /// Creates a mock connection pair for testing.
1098    ///
1099    /// This is the recommended way to create mock remote connections for tests.
1100    /// It returns the `MockConnectionOptions` (which can be passed to create a
1101    /// `HeadlessProject`), an `AnyProtoClient` for the server side and a
1102    /// `ConnectGuard` for the client side which blocks the connection from
1103    /// being established until dropped.
1104    ///
1105    /// # Example
1106    /// ```ignore
1107    /// let (opts, server_session, connect_guard) = RemoteClient::fake_server(cx, server_cx);
1108    /// // Set up HeadlessProject with server_session...
1109    /// drop(connect_guard);
1110    /// let client = RemoteClient::fake_client(opts, cx).await;
1111    /// ```
1112    #[cfg(any(test, feature = "test-support"))]
1113    pub fn fake_server(
1114        client_cx: &mut gpui::TestAppContext,
1115        server_cx: &mut gpui::TestAppContext,
1116    ) -> (RemoteConnectionOptions, AnyProtoClient, ConnectGuard) {
1117        use crate::transport::mock::MockConnection;
1118        let (opts, server_client, connect_guard) = MockConnection::new(client_cx, server_cx);
1119        (opts.into(), server_client, connect_guard)
1120    }
1121
1122    /// Registers a new mock server for existing connection options.
1123    ///
1124    /// Use this to simulate reconnection: after forcing a disconnect, register
1125    /// a new server so the next `connect()` call succeeds.
1126    #[cfg(any(test, feature = "test-support"))]
1127    pub fn fake_server_with_opts(
1128        opts: &RemoteConnectionOptions,
1129        client_cx: &mut gpui::TestAppContext,
1130        server_cx: &mut gpui::TestAppContext,
1131    ) -> (AnyProtoClient, ConnectGuard) {
1132        use crate::transport::mock::MockConnection;
1133        let mock_opts = match opts {
1134            RemoteConnectionOptions::Mock(mock_opts) => mock_opts.clone(),
1135            _ => panic!("fake_server_with_opts requires Mock connection options"),
1136        };
1137        MockConnection::new_with_opts(mock_opts, client_cx, server_cx)
1138    }
1139
1140    /// Creates a `RemoteClient` connected to a mock server.
1141    ///
1142    /// Call `fake_server` first to get the connection options, set up the
1143    /// `HeadlessProject` with the server session, then call this method
1144    /// to create the client.
1145    #[cfg(any(test, feature = "test-support"))]
1146    pub async fn connect_mock(
1147        opts: RemoteConnectionOptions,
1148        client_cx: &mut gpui::TestAppContext,
1149    ) -> Entity<Self> {
1150        assert!(matches!(opts, RemoteConnectionOptions::Mock(..)));
1151        use crate::transport::mock::MockDelegate;
1152        let (_tx, rx) = oneshot::channel();
1153        let mut cx = client_cx.to_async();
1154        let connection = connect(opts, Arc::new(MockDelegate), &mut cx)
1155            .await
1156            .unwrap();
1157        client_cx
1158            .update(|cx| {
1159                Self::new(
1160                    ConnectionIdentifier::setup(),
1161                    connection,
1162                    rx,
1163                    Arc::new(MockDelegate),
1164                    cx,
1165                )
1166            })
1167            .await
1168            .unwrap()
1169            .unwrap()
1170    }
1171
1172    pub fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1173        self.state
1174            .as_ref()
1175            .and_then(|state| state.remote_connection())
1176    }
1177}
1178
1179enum ConnectionPoolEntry {
1180    Connecting(WeakShared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1181    Connected(Weak<dyn RemoteConnection>),
1182}
1183
1184#[derive(Default)]
1185struct ConnectionPool {
1186    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1187}
1188
1189impl Global for ConnectionPool {}
1190
1191impl ConnectionPool {
1192    fn connect(
1193        &mut self,
1194        opts: RemoteConnectionOptions,
1195        delegate: Arc<dyn RemoteClientDelegate>,
1196        cx: &mut App,
1197    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1198        let connection = self.connections.get(&opts);
1199        match connection {
1200            Some(ConnectionPoolEntry::Connecting(task)) => {
1201                if let Some(task) = task.upgrade() {
1202                    log::debug!("Connecting task is still alive");
1203                    cx.spawn(async move |cx| {
1204                        delegate.set_status(Some("Waiting for existing connection attempt"), cx)
1205                    })
1206                    .detach();
1207                    return task;
1208                }
1209                log::debug!("Connecting task is dead, removing it and restarting a connection");
1210                self.connections.remove(&opts);
1211            }
1212            Some(ConnectionPoolEntry::Connected(remote)) => {
1213                if let Some(remote) = remote.upgrade()
1214                    && !remote.has_been_killed()
1215                {
1216                    log::debug!("Connection is still alive");
1217                    return Task::ready(Ok(remote)).shared();
1218                }
1219                log::debug!("Connection is dead, removing it and restarting a connection");
1220                self.connections.remove(&opts);
1221            }
1222            None => {
1223                log::debug!("No existing connection found, starting a new one");
1224            }
1225        }
1226
1227        let task = cx
1228            .spawn({
1229                let opts = opts.clone();
1230                let delegate = delegate.clone();
1231                async move |cx| {
1232                    let connection = match opts.clone() {
1233                        RemoteConnectionOptions::Ssh(opts) => {
1234                            SshRemoteConnection::new(opts, delegate, cx)
1235                                .await
1236                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1237                        }
1238                        RemoteConnectionOptions::Wsl(opts) => {
1239                            WslRemoteConnection::new(opts, delegate, cx)
1240                                .await
1241                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1242                        }
1243                        RemoteConnectionOptions::Docker(opts) => {
1244                            DockerExecConnection::new(opts, delegate, cx)
1245                                .await
1246                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1247                        }
1248                        #[cfg(any(test, feature = "test-support"))]
1249                        RemoteConnectionOptions::Mock(opts) => match cx.update(|cx| {
1250                            cx.default_global::<crate::transport::mock::MockConnectionRegistry>()
1251                                .take(&opts)
1252                        }) {
1253                            Some(connection) => Ok(connection.await as Arc<dyn RemoteConnection>),
1254                            None => Err(anyhow!(
1255                                "Mock connection not found. Call MockConnection::new() first."
1256                            )),
1257                        },
1258                    };
1259
1260                    cx.update_global(|pool: &mut Self, _| {
1261                        debug_assert!(matches!(
1262                            pool.connections.get(&opts),
1263                            Some(ConnectionPoolEntry::Connecting(_))
1264                        ));
1265                        match connection {
1266                            Ok(connection) => {
1267                                pool.connections.insert(
1268                                    opts.clone(),
1269                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1270                                );
1271                                Ok(connection)
1272                            }
1273                            Err(error) => {
1274                                pool.connections.remove(&opts);
1275                                Err(Arc::new(error))
1276                            }
1277                        }
1278                    })
1279                }
1280            })
1281            .shared();
1282        if let Some(task) = task.downgrade() {
1283            self.connections
1284                .insert(opts.clone(), ConnectionPoolEntry::Connecting(task));
1285        }
1286        task
1287    }
1288}
1289
1290#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
1291pub enum RemoteConnectionOptions {
1292    Ssh(SshConnectionOptions),
1293    Wsl(WslConnectionOptions),
1294    Docker(DockerConnectionOptions),
1295    #[cfg(any(test, feature = "test-support"))]
1296    Mock(crate::transport::mock::MockConnectionOptions),
1297}
1298
1299impl RemoteConnectionOptions {
1300    pub fn display_name(&self) -> String {
1301        match self {
1302            RemoteConnectionOptions::Ssh(opts) => opts
1303                .nickname
1304                .clone()
1305                .unwrap_or_else(|| opts.host.to_string()),
1306            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1307            RemoteConnectionOptions::Docker(opts) => {
1308                if opts.use_podman {
1309                    format!("[podman] {}", opts.name)
1310                } else {
1311                    opts.name.clone()
1312                }
1313            }
1314            #[cfg(any(test, feature = "test-support"))]
1315            RemoteConnectionOptions::Mock(opts) => format!("mock-{}", opts.id),
1316        }
1317    }
1318}
1319
1320#[cfg(test)]
1321mod tests {
1322    use super::*;
1323
1324    #[test]
1325    fn test_ssh_display_name_prefers_nickname() {
1326        let options = RemoteConnectionOptions::Ssh(SshConnectionOptions {
1327            host: "1.2.3.4".into(),
1328            nickname: Some("My Cool Project".to_string()),
1329            ..Default::default()
1330        });
1331
1332        assert_eq!(options.display_name(), "My Cool Project");
1333    }
1334
1335    #[test]
1336    fn test_ssh_display_name_falls_back_to_host() {
1337        let options = RemoteConnectionOptions::Ssh(SshConnectionOptions {
1338            host: "1.2.3.4".into(),
1339            ..Default::default()
1340        });
1341
1342        assert_eq!(options.display_name(), "1.2.3.4");
1343    }
1344}
1345
1346impl From<SshConnectionOptions> for RemoteConnectionOptions {
1347    fn from(opts: SshConnectionOptions) -> Self {
1348        RemoteConnectionOptions::Ssh(opts)
1349    }
1350}
1351
1352impl From<WslConnectionOptions> for RemoteConnectionOptions {
1353    fn from(opts: WslConnectionOptions) -> Self {
1354        RemoteConnectionOptions::Wsl(opts)
1355    }
1356}
1357
1358#[cfg(any(test, feature = "test-support"))]
1359impl From<crate::transport::mock::MockConnectionOptions> for RemoteConnectionOptions {
1360    fn from(opts: crate::transport::mock::MockConnectionOptions) -> Self {
1361        RemoteConnectionOptions::Mock(opts)
1362    }
1363}
1364
1365#[cfg(target_os = "windows")]
1366/// Open a wsl path (\\wsl.localhost\<distro>\path)
1367#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1368#[action(namespace = workspace, no_json, no_register)]
1369pub struct OpenWslPath {
1370    pub distro: WslConnectionOptions,
1371    pub paths: Vec<PathBuf>,
1372}
1373
1374#[async_trait(?Send)]
1375pub trait RemoteConnection: Send + Sync {
1376    fn start_proxy(
1377        &self,
1378        unique_identifier: String,
1379        reconnect: bool,
1380        incoming_tx: UnboundedSender<Envelope>,
1381        outgoing_rx: UnboundedReceiver<Envelope>,
1382        connection_activity_tx: Sender<()>,
1383        delegate: Arc<dyn RemoteClientDelegate>,
1384        cx: &mut AsyncApp,
1385    ) -> Task<Result<i32>>;
1386    fn upload_directory(
1387        &self,
1388        src_path: PathBuf,
1389        dest_path: RemotePathBuf,
1390        cx: &App,
1391    ) -> Task<Result<()>>;
1392    async fn kill(&self) -> Result<()>;
1393    fn has_been_killed(&self) -> bool;
1394    fn shares_network_interface(&self) -> bool {
1395        false
1396    }
1397    fn build_command(
1398        &self,
1399        program: Option<String>,
1400        args: &[String],
1401        env: &HashMap<String, String>,
1402        working_dir: Option<String>,
1403        port_forward: Option<(u16, String, u16)>,
1404        interactive: Interactive,
1405    ) -> Result<CommandTemplate>;
1406    fn build_forward_ports_command(
1407        &self,
1408        forwards: Vec<(u16, String, u16)>,
1409    ) -> Result<CommandTemplate>;
1410    fn connection_options(&self) -> RemoteConnectionOptions;
1411    fn path_style(&self) -> PathStyle;
1412    fn shell(&self) -> String;
1413    fn default_system_shell(&self) -> String;
1414    fn has_wsl_interop(&self) -> bool;
1415
1416    #[cfg(any(test, feature = "test-support"))]
1417    fn simulate_disconnect(&self, _: &AsyncApp) {}
1418}
1419
1420type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1421
1422struct Signal<T> {
1423    tx: Mutex<Option<oneshot::Sender<T>>>,
1424    rx: Shared<Task<Option<T>>>,
1425}
1426
1427impl<T: Send + Clone + 'static> Signal<T> {
1428    pub fn new(cx: &App) -> Self {
1429        let (tx, rx) = oneshot::channel();
1430
1431        let task = cx
1432            .background_executor()
1433            .spawn(async move { rx.await.ok() })
1434            .shared();
1435
1436        Self {
1437            tx: Mutex::new(Some(tx)),
1438            rx: task,
1439        }
1440    }
1441
1442    fn set(&self, value: T) {
1443        if let Some(tx) = self.tx.lock().take() {
1444            let _ = tx.send(value);
1445        }
1446    }
1447
1448    fn wait(&self) -> Shared<Task<Option<T>>> {
1449        self.rx.clone()
1450    }
1451}
1452
1453pub(crate) struct ChannelClient {
1454    next_message_id: AtomicU32,
1455    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1456    buffer: Mutex<VecDeque<Envelope>>,
1457    response_channels: ResponseChannels,
1458    message_handlers: Mutex<ProtoMessageHandlerSet>,
1459    max_received: AtomicU32,
1460    name: &'static str,
1461    task: Mutex<Task<Result<()>>>,
1462    remote_started: Signal<()>,
1463    has_wsl_interop: bool,
1464    executor: BackgroundExecutor,
1465}
1466
1467impl ChannelClient {
1468    pub(crate) fn new(
1469        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1470        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1471        cx: &App,
1472        name: &'static str,
1473        has_wsl_interop: bool,
1474    ) -> Arc<Self> {
1475        Arc::new_cyclic(|this| Self {
1476            outgoing_tx: Mutex::new(outgoing_tx),
1477            next_message_id: AtomicU32::new(0),
1478            max_received: AtomicU32::new(0),
1479            response_channels: ResponseChannels::default(),
1480            message_handlers: Default::default(),
1481            buffer: Mutex::new(VecDeque::new()),
1482            name,
1483            executor: cx.background_executor().clone(),
1484            task: Mutex::new(Self::start_handling_messages(
1485                this.clone(),
1486                incoming_rx,
1487                &cx.to_async(),
1488            )),
1489            remote_started: Signal::new(cx),
1490            has_wsl_interop,
1491        })
1492    }
1493
1494    fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1495        self.remote_started.wait()
1496    }
1497
1498    fn start_handling_messages(
1499        this: Weak<Self>,
1500        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1501        cx: &AsyncApp,
1502    ) -> Task<Result<()>> {
1503        cx.spawn(async move |cx| {
1504            if let Some(this) = this.upgrade() {
1505                let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1506                this.outgoing_tx.lock().unbounded_send(envelope).ok();
1507            };
1508
1509            let peer_id = PeerId { owner_id: 0, id: 0 };
1510            while let Some(incoming) = incoming_rx.next().await {
1511                let Some(this) = this.upgrade() else {
1512                    return anyhow::Ok(());
1513                };
1514                if let Some(ack_id) = incoming.ack_id {
1515                    let mut buffer = this.buffer.lock();
1516                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1517                        buffer.pop_front();
1518                    }
1519                }
1520                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1521                {
1522                    log::debug!(
1523                        "{}:remote message received. name:FlushBufferedMessages",
1524                        this.name
1525                    );
1526                    {
1527                        let buffer = this.buffer.lock();
1528                        for envelope in buffer.iter() {
1529                            this.outgoing_tx
1530                                .lock()
1531                                .unbounded_send(envelope.clone())
1532                                .ok();
1533                        }
1534                    }
1535                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1536                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1537                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1538                    continue;
1539                }
1540
1541                if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1542                    this.remote_started.set(());
1543                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1544                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1545                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1546                    continue;
1547                }
1548
1549                this.max_received.store(incoming.id, SeqCst);
1550
1551                if let Some(request_id) = incoming.responding_to {
1552                    let request_id = MessageId(request_id);
1553                    let sender = this.response_channels.lock().remove(&request_id);
1554                    if let Some(sender) = sender {
1555                        let (tx, rx) = oneshot::channel();
1556                        if incoming.payload.is_some() {
1557                            sender.send((incoming, tx)).ok();
1558                        }
1559                        rx.await.ok();
1560                    }
1561                } else if let Some(envelope) =
1562                    build_typed_envelope(peer_id, Instant::now(), incoming)
1563                {
1564                    let type_name = envelope.payload_type_name();
1565                    let message_id = envelope.message_id();
1566                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1567                        &this.message_handlers,
1568                        envelope,
1569                        this.clone().into(),
1570                        cx.clone(),
1571                    ) {
1572                        log::debug!("{}:remote message received. name:{type_name}", this.name);
1573                        cx.foreground_executor()
1574                            .spawn(async move {
1575                                match future.await {
1576                                    Ok(_) => {
1577                                        log::debug!(
1578                                            "{}:remote message handled. name:{type_name}",
1579                                            this.name
1580                                        );
1581                                    }
1582                                    Err(error) => {
1583                                        log::error!(
1584                                            "{}:error handling message. type:{}, error:{:#}",
1585                                            this.name,
1586                                            type_name,
1587                                            format!("{error:#}").lines().fold(
1588                                                String::new(),
1589                                                |mut message, line| {
1590                                                    if !message.is_empty() {
1591                                                        message.push(' ');
1592                                                    }
1593                                                    message.push_str(line);
1594                                                    message
1595                                                }
1596                                            )
1597                                        );
1598                                    }
1599                                }
1600                            })
1601                            .detach()
1602                    } else {
1603                        log::error!("{}:unhandled remote message name:{type_name}", this.name);
1604                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1605                            message_id,
1606                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1607                        ) {
1608                            log::error!(
1609                                "{}:error sending error response for {type_name}:{e:#}",
1610                                this.name
1611                            );
1612                        }
1613                    }
1614                }
1615            }
1616            anyhow::Ok(())
1617        })
1618    }
1619
1620    pub(crate) fn reconnect(
1621        self: &Arc<Self>,
1622        incoming_rx: UnboundedReceiver<Envelope>,
1623        outgoing_tx: UnboundedSender<Envelope>,
1624        cx: &AsyncApp,
1625    ) {
1626        *self.outgoing_tx.lock() = outgoing_tx;
1627        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1628    }
1629
1630    fn request<T: RequestMessage>(
1631        &self,
1632        payload: T,
1633    ) -> impl 'static + Future<Output = Result<T::Response>> {
1634        self.request_internal(payload, true)
1635    }
1636
1637    fn request_internal<T: RequestMessage>(
1638        &self,
1639        payload: T,
1640        use_buffer: bool,
1641    ) -> impl 'static + Future<Output = Result<T::Response>> {
1642        log::debug!("remote request start. name:{}", T::NAME);
1643        let response =
1644            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1645        async move {
1646            let response = response.await?;
1647            log::debug!("remote request finish. name:{}", T::NAME);
1648            T::Response::from_envelope(response).context("received a response of the wrong type")
1649        }
1650    }
1651
1652    async fn resync(&self, timeout: Duration) -> Result<()> {
1653        smol::future::or(
1654            async {
1655                self.request_internal(proto::FlushBufferedMessages {}, false)
1656                    .await?;
1657
1658                for envelope in self.buffer.lock().iter() {
1659                    self.outgoing_tx
1660                        .lock()
1661                        .unbounded_send(envelope.clone())
1662                        .ok();
1663                }
1664                Ok(())
1665            },
1666            async {
1667                self.executor.timer(timeout).await;
1668                anyhow::bail!("Timed out resyncing remote client")
1669            },
1670        )
1671        .await
1672    }
1673
1674    async fn ping(&self, timeout: Duration) -> Result<()> {
1675        smol::future::or(
1676            async {
1677                self.request(proto::Ping {}).await?;
1678                Ok(())
1679            },
1680            async {
1681                self.executor.timer(timeout).await;
1682                anyhow::bail!("Timed out pinging remote client")
1683            },
1684        )
1685        .await
1686    }
1687
1688    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1689        log::debug!("remote send name:{}", T::NAME);
1690        self.send_dynamic(payload.into_envelope(0, None, None))
1691    }
1692
1693    fn request_dynamic(
1694        &self,
1695        mut envelope: proto::Envelope,
1696        type_name: &'static str,
1697        use_buffer: bool,
1698    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1699        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1700        let (tx, rx) = oneshot::channel();
1701        let mut response_channels_lock = self.response_channels.lock();
1702        response_channels_lock.insert(MessageId(envelope.id), tx);
1703        drop(response_channels_lock);
1704
1705        let result = if use_buffer {
1706            self.send_buffered(envelope)
1707        } else {
1708            self.send_unbuffered(envelope)
1709        };
1710        async move {
1711            if let Err(error) = &result {
1712                log::error!("failed to send message: {error}");
1713                anyhow::bail!("failed to send message: {error}");
1714            }
1715
1716            let response = rx.await.context("connection lost")?.0;
1717            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1718                return Err(RpcError::from_proto(error, type_name));
1719            }
1720            Ok(response)
1721        }
1722    }
1723
1724    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1725        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1726        self.send_buffered(envelope)
1727    }
1728
1729    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1730        envelope.ack_id = Some(self.max_received.load(SeqCst));
1731        self.buffer.lock().push_back(envelope.clone());
1732        // ignore errors on send (happen while we're reconnecting)
1733        // assume that the global "disconnected" overlay is sufficient.
1734        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1735        Ok(())
1736    }
1737
1738    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1739        envelope.ack_id = Some(self.max_received.load(SeqCst));
1740        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1741        Ok(())
1742    }
1743}
1744
1745impl ProtoClient for ChannelClient {
1746    fn request(
1747        &self,
1748        envelope: proto::Envelope,
1749        request_type: &'static str,
1750    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1751        self.request_dynamic(envelope, request_type, true).boxed()
1752    }
1753
1754    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1755        self.send_dynamic(envelope)
1756    }
1757
1758    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1759        self.send_dynamic(envelope)
1760    }
1761
1762    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1763        &self.message_handlers
1764    }
1765
1766    fn is_via_collab(&self) -> bool {
1767        false
1768    }
1769
1770    fn has_wsl_interop(&self) -> bool {
1771        self.has_wsl_interop
1772    }
1773}