remote_client.rs

   1#[cfg(any(test, feature = "test-support"))]
   2use crate::transport::mock::ConnectGuard;
   3use crate::{
   4    SshConnectionOptions,
   5    protocol::MessageId,
   6    proxy::ProxyLaunchError,
   7    transport::{
   8        docker::{DockerConnectionOptions, DockerExecConnection},
   9        ssh::SshRemoteConnection,
  10        wsl::{WslConnectionOptions, WslRemoteConnection},
  11    },
  12};
  13use anyhow::{Context as _, Result, anyhow};
  14use askpass::EncryptedPassword;
  15use async_trait::async_trait;
  16use collections::HashMap;
  17use futures::{
  18    Future, FutureExt as _, StreamExt as _,
  19    channel::{
  20        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  21        oneshot,
  22    },
  23    future::{BoxFuture, Shared, WeakShared},
  24    select, select_biased,
  25};
  26use gpui::{
  27    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  28    EventEmitter, FutureExt, Global, Task, WeakEntity,
  29};
  30use parking_lot::Mutex;
  31
  32use release_channel::ReleaseChannel;
  33use rpc::{
  34    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  35    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  36};
  37use semver::Version;
  38use std::{
  39    collections::VecDeque,
  40    fmt,
  41    ops::ControlFlow,
  42    path::PathBuf,
  43    sync::{
  44        Arc, Weak,
  45        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  46    },
  47    time::{Duration, Instant},
  48};
  49use util::{
  50    ResultExt,
  51    paths::{PathStyle, RemotePathBuf},
  52};
  53
  54#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  55pub enum RemoteOs {
  56    Linux,
  57    MacOs,
  58    Windows,
  59}
  60
  61impl RemoteOs {
  62    pub fn as_str(&self) -> &'static str {
  63        match self {
  64            RemoteOs::Linux => "linux",
  65            RemoteOs::MacOs => "macos",
  66            RemoteOs::Windows => "windows",
  67        }
  68    }
  69
  70    pub fn is_windows(&self) -> bool {
  71        matches!(self, RemoteOs::Windows)
  72    }
  73}
  74
  75impl std::fmt::Display for RemoteOs {
  76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  77        f.write_str(self.as_str())
  78    }
  79}
  80
  81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  82pub enum RemoteArch {
  83    X86_64,
  84    Aarch64,
  85}
  86
  87impl RemoteArch {
  88    pub fn as_str(&self) -> &'static str {
  89        match self {
  90            RemoteArch::X86_64 => "x86_64",
  91            RemoteArch::Aarch64 => "aarch64",
  92        }
  93    }
  94}
  95
  96impl std::fmt::Display for RemoteArch {
  97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  98        f.write_str(self.as_str())
  99    }
 100}
 101
 102#[derive(Copy, Clone, Debug)]
 103pub struct RemotePlatform {
 104    pub os: RemoteOs,
 105    pub arch: RemoteArch,
 106}
 107
 108#[derive(Clone, Debug)]
 109pub struct CommandTemplate {
 110    pub program: String,
 111    pub args: Vec<String>,
 112    pub env: HashMap<String, String>,
 113}
 114
 115/// Whether a command should be run with TTY allocation for interactive use.
 116#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 117pub enum Interactive {
 118    /// Allocate a pseudo-TTY for interactive terminal use.
 119    Yes,
 120    /// Do not allocate a TTY - for commands that communicate via piped stdio.
 121    No,
 122}
 123
 124pub trait RemoteClientDelegate: Send + Sync {
 125    fn ask_password(
 126        &self,
 127        prompt: String,
 128        tx: oneshot::Sender<EncryptedPassword>,
 129        cx: &mut AsyncApp,
 130    );
 131    fn get_download_url(
 132        &self,
 133        platform: RemotePlatform,
 134        release_channel: ReleaseChannel,
 135        version: Option<Version>,
 136        cx: &mut AsyncApp,
 137    ) -> Task<Result<Option<String>>>;
 138    fn download_server_binary_locally(
 139        &self,
 140        platform: RemotePlatform,
 141        release_channel: ReleaseChannel,
 142        version: Option<Version>,
 143        cx: &mut AsyncApp,
 144    ) -> Task<Result<PathBuf>>;
 145    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
 146}
 147
 148const MAX_MISSED_HEARTBEATS: usize = 5;
 149const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
 150const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
 151const INITIAL_CONNECTION_TIMEOUT: Duration =
 152    Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
 153
 154pub const MAX_RECONNECT_ATTEMPTS: usize = 3;
 155
 156enum State {
 157    Connecting,
 158    Connected {
 159        remote_connection: Arc<dyn RemoteConnection>,
 160        delegate: Arc<dyn RemoteClientDelegate>,
 161
 162        multiplex_task: Task<Result<()>>,
 163        heartbeat_task: Task<Result<()>>,
 164    },
 165    HeartbeatMissed {
 166        missed_heartbeats: usize,
 167
 168        remote_connection: Arc<dyn RemoteConnection>,
 169        delegate: Arc<dyn RemoteClientDelegate>,
 170
 171        multiplex_task: Task<Result<()>>,
 172        heartbeat_task: Task<Result<()>>,
 173    },
 174    Reconnecting,
 175    ReconnectFailed {
 176        remote_connection: Arc<dyn RemoteConnection>,
 177        delegate: Arc<dyn RemoteClientDelegate>,
 178
 179        error: anyhow::Error,
 180        attempts: usize,
 181    },
 182    ReconnectExhausted,
 183    ServerNotRunning,
 184}
 185
 186impl fmt::Display for State {
 187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 188        match self {
 189            Self::Connecting => write!(f, "connecting"),
 190            Self::Connected { .. } => write!(f, "connected"),
 191            Self::Reconnecting => write!(f, "reconnecting"),
 192            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 193            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 194            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 195            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 196        }
 197    }
 198}
 199
 200impl State {
 201    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 202        match self {
 203            Self::Connected {
 204                remote_connection, ..
 205            } => Some(remote_connection.clone()),
 206            Self::HeartbeatMissed {
 207                remote_connection, ..
 208            } => Some(remote_connection.clone()),
 209            Self::ReconnectFailed {
 210                remote_connection, ..
 211            } => Some(remote_connection.clone()),
 212            _ => None,
 213        }
 214    }
 215
 216    fn can_reconnect(&self) -> bool {
 217        match self {
 218            Self::Connected { .. }
 219            | Self::HeartbeatMissed { .. }
 220            | Self::ReconnectFailed { .. } => true,
 221            State::Connecting
 222            | State::Reconnecting
 223            | State::ReconnectExhausted
 224            | State::ServerNotRunning => false,
 225        }
 226    }
 227
 228    fn is_reconnect_failed(&self) -> bool {
 229        matches!(self, Self::ReconnectFailed { .. })
 230    }
 231
 232    fn is_reconnect_exhausted(&self) -> bool {
 233        matches!(self, Self::ReconnectExhausted { .. })
 234    }
 235
 236    fn is_server_not_running(&self) -> bool {
 237        matches!(self, Self::ServerNotRunning)
 238    }
 239
 240    fn is_reconnecting(&self) -> bool {
 241        matches!(self, Self::Reconnecting { .. })
 242    }
 243
 244    fn heartbeat_recovered(self) -> Self {
 245        match self {
 246            Self::HeartbeatMissed {
 247                remote_connection,
 248                delegate,
 249                multiplex_task,
 250                heartbeat_task,
 251                ..
 252            } => Self::Connected {
 253                remote_connection,
 254                delegate,
 255                multiplex_task,
 256                heartbeat_task,
 257            },
 258            _ => self,
 259        }
 260    }
 261
 262    fn heartbeat_missed(self) -> Self {
 263        match self {
 264            Self::Connected {
 265                remote_connection,
 266                delegate,
 267                multiplex_task,
 268                heartbeat_task,
 269            } => Self::HeartbeatMissed {
 270                missed_heartbeats: 1,
 271                remote_connection,
 272                delegate,
 273                multiplex_task,
 274                heartbeat_task,
 275            },
 276            Self::HeartbeatMissed {
 277                missed_heartbeats,
 278                remote_connection,
 279                delegate,
 280                multiplex_task,
 281                heartbeat_task,
 282            } => Self::HeartbeatMissed {
 283                missed_heartbeats: missed_heartbeats + 1,
 284                remote_connection,
 285                delegate,
 286                multiplex_task,
 287                heartbeat_task,
 288            },
 289            _ => self,
 290        }
 291    }
 292}
 293
 294/// The state of the ssh connection.
 295#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 296pub enum ConnectionState {
 297    Connecting,
 298    Connected,
 299    HeartbeatMissed,
 300    Reconnecting,
 301    Disconnected,
 302}
 303
 304impl From<&State> for ConnectionState {
 305    fn from(value: &State) -> Self {
 306        match value {
 307            State::Connecting => Self::Connecting,
 308            State::Connected { .. } => Self::Connected,
 309            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 310            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 311            State::ReconnectExhausted => Self::Disconnected,
 312            State::ServerNotRunning => Self::Disconnected,
 313        }
 314    }
 315}
 316
 317pub struct RemoteClient {
 318    client: Arc<ChannelClient>,
 319    unique_identifier: String,
 320    connection_options: RemoteConnectionOptions,
 321    path_style: PathStyle,
 322    state: Option<State>,
 323}
 324
 325#[derive(Debug)]
 326pub enum RemoteClientEvent {
 327    Disconnected { server_not_running: bool },
 328}
 329
 330impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 331
 332/// Identifies the socket on the remote server so that reconnects
 333/// can re-join the same project.
 334pub enum ConnectionIdentifier {
 335    Setup(u64),
 336    Workspace(i64),
 337}
 338
 339static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 340
 341impl ConnectionIdentifier {
 342    pub fn setup() -> Self {
 343        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 344    }
 345
 346    // This string gets used in a socket name, and so must be relatively short.
 347    // The total length of:
 348    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 349    // Must be less than about 100 characters
 350    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 351    // So our strings should be at most 20 characters or so.
 352    fn to_string(&self, cx: &App) -> String {
 353        let identifier_prefix = match ReleaseChannel::global(cx) {
 354            ReleaseChannel::Stable => "".to_string(),
 355            release_channel => format!("{}-", release_channel.dev_name()),
 356        };
 357        match self {
 358            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 359            Self::Workspace(workspace_id) => {
 360                format!("{identifier_prefix}workspace-{workspace_id}",)
 361            }
 362        }
 363    }
 364}
 365
 366pub async fn connect(
 367    connection_options: RemoteConnectionOptions,
 368    delegate: Arc<dyn RemoteClientDelegate>,
 369    cx: &mut AsyncApp,
 370) -> Result<Arc<dyn RemoteConnection>> {
 371    cx.update(|cx| {
 372        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 373            pool.connect(connection_options.clone(), delegate.clone(), cx)
 374        })
 375    })
 376    .await
 377    .map_err(|e| e.cloned())
 378}
 379
 380impl RemoteClient {
 381    pub fn new(
 382        unique_identifier: ConnectionIdentifier,
 383        remote_connection: Arc<dyn RemoteConnection>,
 384        cancellation: oneshot::Receiver<()>,
 385        delegate: Arc<dyn RemoteClientDelegate>,
 386        cx: &mut App,
 387    ) -> Task<Result<Option<Entity<Self>>>> {
 388        let unique_identifier = unique_identifier.to_string(cx);
 389        cx.spawn(async move |cx| {
 390            let success = Box::pin(async move {
 391                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 392                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 393                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 394
 395                let client = cx.update(|cx| {
 396                    ChannelClient::new(
 397                        incoming_rx,
 398                        outgoing_tx,
 399                        cx,
 400                        "client",
 401                        remote_connection.has_wsl_interop(),
 402                    )
 403                });
 404
 405                let path_style = remote_connection.path_style();
 406                let this = cx.new(|_| Self {
 407                    client: client.clone(),
 408                    unique_identifier: unique_identifier.clone(),
 409                    connection_options: remote_connection.connection_options(),
 410                    path_style,
 411                    state: Some(State::Connecting),
 412                });
 413
 414                let io_task = remote_connection.start_proxy(
 415                    unique_identifier,
 416                    false,
 417                    incoming_tx,
 418                    outgoing_rx,
 419                    connection_activity_tx,
 420                    delegate.clone(),
 421                    cx,
 422                );
 423
 424                let ready = client
 425                    .wait_for_remote_started()
 426                    .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
 427                    .await;
 428                match ready {
 429                    Ok(Some(_)) => {}
 430                    Ok(None) => {
 431                        let mut error = "remote client exited before becoming ready".to_owned();
 432                        if let Some(status) = io_task.now_or_never() {
 433                            match status {
 434                                Ok(exit_code) => {
 435                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 436                                }
 437                                Err(e) => error.push_str(&format!(", error={e:?}")),
 438                            }
 439                        }
 440                        let error = anyhow::anyhow!("{error}");
 441                        log::error!("failed to establish connection: {}", error);
 442                        return Err(error);
 443                    }
 444                    Err(_) => {
 445                        let mut error = String::new();
 446                        if let Some(status) = io_task.now_or_never() {
 447                            error.push_str("Client exited with ");
 448                            match status {
 449                                Ok(exit_code) => {
 450                                    error.push_str(&format!(" exit_code {exit_code:?}"))
 451                                }
 452                                Err(e) => error.push_str(&format!(" error {e:?}")),
 453                            }
 454                        } else {
 455                            error.push_str("client did not become ready within the timeout");
 456                        }
 457                        let error = anyhow::anyhow!("{error}");
 458                        log::error!("failed to establish connection: {error}");
 459                        return Err(error);
 460                    }
 461                }
 462                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 463                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 464                    log::error!("failed to establish connection: {}", error);
 465                    return Err(error);
 466                }
 467
 468                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 469
 470                this.update(cx, |this, _| {
 471                    this.state = Some(State::Connected {
 472                        remote_connection,
 473                        delegate,
 474                        multiplex_task,
 475                        heartbeat_task,
 476                    });
 477                });
 478
 479                Ok(Some(this))
 480            });
 481
 482            select! {
 483                _ = cancellation.fuse() => {
 484                    Ok(None)
 485                }
 486                result = success.fuse() =>  result
 487            }
 488        })
 489    }
 490
 491    pub fn proto_client_from_channels(
 492        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 493        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 494        cx: &App,
 495        name: &'static str,
 496        has_wsl_interop: bool,
 497    ) -> AnyProtoClient {
 498        ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
 499    }
 500
 501    pub fn shutdown_processes<T: RequestMessage>(
 502        &mut self,
 503        shutdown_request: Option<T>,
 504        executor: BackgroundExecutor,
 505    ) -> Option<impl Future<Output = ()> + use<T>> {
 506        let state = self.state.take()?;
 507        log::info!("shutting down remote processes");
 508
 509        let State::Connected {
 510            multiplex_task,
 511            heartbeat_task,
 512            remote_connection,
 513            delegate,
 514        } = state
 515        else {
 516            return None;
 517        };
 518
 519        let client = self.client.clone();
 520
 521        Some(async move {
 522            if let Some(shutdown_request) = shutdown_request {
 523                client.send(shutdown_request).log_err();
 524                // We wait 50ms instead of waiting for a response, because
 525                // waiting for a response would require us to wait on the main thread
 526                // which we want to avoid in an `on_app_quit` callback.
 527                executor.timer(Duration::from_millis(50)).await;
 528            }
 529
 530            // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
 531            // child of master_process.
 532            drop(multiplex_task);
 533            // Now drop the rest of state, which kills master process.
 534            drop(heartbeat_task);
 535            drop(remote_connection);
 536            drop(delegate);
 537        })
 538    }
 539
 540    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 541        let can_reconnect = self
 542            .state
 543            .as_ref()
 544            .map(|state| state.can_reconnect())
 545            .unwrap_or(false);
 546        if !can_reconnect {
 547            let state = if let Some(state) = self.state.as_ref() {
 548                state.to_string()
 549            } else {
 550                "no state set".to_string()
 551            };
 552            log::info!(
 553                "aborting reconnect, because not in state that allows reconnecting: {state}"
 554            );
 555            anyhow::bail!(
 556                "aborting reconnect, because not in state that allows reconnecting: {state}"
 557            );
 558        }
 559
 560        let state = self.state.take().unwrap();
 561        let (attempts, remote_connection, delegate) = match state {
 562            State::Connected {
 563                remote_connection,
 564                delegate,
 565                multiplex_task,
 566                heartbeat_task,
 567            }
 568            | State::HeartbeatMissed {
 569                remote_connection,
 570                delegate,
 571                multiplex_task,
 572                heartbeat_task,
 573                ..
 574            } => {
 575                drop(multiplex_task);
 576                drop(heartbeat_task);
 577                (0, remote_connection, delegate)
 578            }
 579            State::ReconnectFailed {
 580                attempts,
 581                remote_connection,
 582                delegate,
 583                ..
 584            } => (attempts, remote_connection, delegate),
 585            State::Connecting
 586            | State::Reconnecting
 587            | State::ReconnectExhausted
 588            | State::ServerNotRunning => unreachable!(),
 589        };
 590
 591        let attempts = attempts + 1;
 592        if attempts > MAX_RECONNECT_ATTEMPTS {
 593            log::error!(
 594                "Failed to reconnect to after {} attempts, giving up",
 595                MAX_RECONNECT_ATTEMPTS
 596            );
 597            self.set_state(State::ReconnectExhausted, cx);
 598            return Ok(());
 599        }
 600
 601        self.set_state(State::Reconnecting, cx);
 602
 603        log::info!(
 604            "Trying to reconnect to remote server... Attempt {}",
 605            attempts
 606        );
 607
 608        let unique_identifier = self.unique_identifier.clone();
 609        let client = self.client.clone();
 610        let reconnect_task = cx.spawn(async move |this, cx| {
 611            macro_rules! failed {
 612                ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
 613                    delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
 614                    return State::ReconnectFailed {
 615                        error: anyhow!($error),
 616                        attempts: $attempts,
 617                        remote_connection: $remote_connection,
 618                        delegate: $delegate,
 619                    };
 620                };
 621            }
 622
 623            if let Err(error) = remote_connection
 624                .kill()
 625                .await
 626                .context("Failed to kill remote_connection process")
 627            {
 628                failed!(error, attempts, remote_connection, delegate);
 629            };
 630
 631            let connection_options = remote_connection.connection_options();
 632
 633            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 634            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 635            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 636
 637            let (remote_connection, io_task) = match async {
 638                let remote_connection = cx
 639                    .update_global(|pool: &mut ConnectionPool, cx| {
 640                        pool.connect(connection_options, delegate.clone(), cx)
 641                    })
 642                    .await
 643                    .map_err(|error| error.cloned())?;
 644
 645                let io_task = remote_connection.start_proxy(
 646                    unique_identifier,
 647                    true,
 648                    incoming_tx,
 649                    outgoing_rx,
 650                    connection_activity_tx,
 651                    delegate.clone(),
 652                    cx,
 653                );
 654                anyhow::Ok((remote_connection, io_task))
 655            }
 656            .await
 657            {
 658                Ok((remote_connection, io_task)) => (remote_connection, io_task),
 659                Err(error) => {
 660                    failed!(error, attempts, remote_connection, delegate);
 661                }
 662            };
 663
 664            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 665            client.reconnect(incoming_rx, outgoing_tx, cx);
 666
 667            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 668                failed!(error, attempts, remote_connection, delegate);
 669            };
 670
 671            State::Connected {
 672                remote_connection,
 673                delegate,
 674                multiplex_task,
 675                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 676            }
 677        });
 678
 679        cx.spawn(async move |this, cx| {
 680            let new_state = reconnect_task.await;
 681            this.update(cx, |this, cx| {
 682                this.try_set_state(cx, |old_state| {
 683                    if old_state.is_reconnecting() {
 684                        match &new_state {
 685                            State::Connecting
 686                            | State::Reconnecting
 687                            | State::HeartbeatMissed { .. }
 688                            | State::ServerNotRunning => {}
 689                            State::Connected { .. } => {
 690                                log::info!("Successfully reconnected");
 691                            }
 692                            State::ReconnectFailed {
 693                                error, attempts, ..
 694                            } => {
 695                                log::error!(
 696                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 697                                    attempts,
 698                                    error
 699                                );
 700                            }
 701                            State::ReconnectExhausted => {
 702                                log::error!("Reconnect attempt failed and all attempts exhausted");
 703                            }
 704                        }
 705                        Some(new_state)
 706                    } else {
 707                        None
 708                    }
 709                });
 710
 711                if this.state_is(State::is_reconnect_failed) {
 712                    this.reconnect(cx)
 713                } else if this.state_is(State::is_reconnect_exhausted) {
 714                    Ok(())
 715                } else {
 716                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 717                    Ok(())
 718                }
 719            })
 720        })
 721        .detach_and_log_err(cx);
 722
 723        Ok(())
 724    }
 725
 726    fn heartbeat(
 727        this: WeakEntity<Self>,
 728        mut connection_activity_rx: mpsc::Receiver<()>,
 729        cx: &mut AsyncApp,
 730    ) -> Task<Result<()>> {
 731        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 732            return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
 733        };
 734
 735        cx.spawn(async move |cx| {
 736            let mut missed_heartbeats = 0;
 737
 738            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 739            futures::pin_mut!(keepalive_timer);
 740
 741            loop {
 742                select_biased! {
 743                    result = connection_activity_rx.next().fuse() => {
 744                        if result.is_none() {
 745                            log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
 746                            return Ok(());
 747                        }
 748
 749                        if missed_heartbeats != 0 {
 750                            missed_heartbeats = 0;
 751                            let _ =this.update(cx, |this, cx| {
 752                                this.handle_heartbeat_result(missed_heartbeats, cx)
 753                            })?;
 754                        }
 755                    }
 756                    _ = keepalive_timer => {
 757                        log::debug!("Sending heartbeat to server...");
 758
 759                        let result = select_biased! {
 760                            _ = connection_activity_rx.next().fuse() => {
 761                                Ok(())
 762                            }
 763                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 764                                ping_result
 765                            }
 766                        };
 767
 768                        if result.is_err() {
 769                            missed_heartbeats += 1;
 770                            log::warn!(
 771                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 772                                HEARTBEAT_TIMEOUT,
 773                                missed_heartbeats,
 774                                MAX_MISSED_HEARTBEATS
 775                            );
 776                        } else if missed_heartbeats != 0 {
 777                            missed_heartbeats = 0;
 778                        } else {
 779                            continue;
 780                        }
 781
 782                        let result = this.update(cx, |this, cx| {
 783                            this.handle_heartbeat_result(missed_heartbeats, cx)
 784                        })?;
 785                        if result.is_break() {
 786                            return Ok(());
 787                        }
 788                    }
 789                }
 790
 791                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 792            }
 793        })
 794    }
 795
 796    fn handle_heartbeat_result(
 797        &mut self,
 798        missed_heartbeats: usize,
 799        cx: &mut Context<Self>,
 800    ) -> ControlFlow<()> {
 801        let state = self.state.take().unwrap();
 802        let next_state = if missed_heartbeats > 0 {
 803            state.heartbeat_missed()
 804        } else {
 805            state.heartbeat_recovered()
 806        };
 807
 808        self.set_state(next_state, cx);
 809
 810        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 811            log::error!(
 812                "Missed last {} heartbeats. Reconnecting...",
 813                missed_heartbeats
 814            );
 815
 816            self.reconnect(cx)
 817                .context("failed to start reconnect process after missing heartbeats")
 818                .log_err();
 819            ControlFlow::Break(())
 820        } else {
 821            ControlFlow::Continue(())
 822        }
 823    }
 824
 825    fn monitor(
 826        this: WeakEntity<Self>,
 827        io_task: Task<Result<i32>>,
 828        cx: &AsyncApp,
 829    ) -> Task<Result<()>> {
 830        cx.spawn(async move |cx| {
 831            let result = io_task.await;
 832
 833            match result {
 834                Ok(exit_code) => {
 835                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 836                        match error {
 837                            ProxyLaunchError::ServerNotRunning => {
 838                                log::error!("failed to reconnect because server is not running");
 839                                this.update(cx, |this, cx| {
 840                                    this.set_state(State::ServerNotRunning, cx);
 841                                })?;
 842                            }
 843                        }
 844                    } else if exit_code > 0 {
 845                        log::error!("proxy process terminated unexpectedly");
 846                        this.update(cx, |this, cx| {
 847                            this.reconnect(cx).ok();
 848                        })?;
 849                    }
 850                }
 851                Err(error) => {
 852                    log::warn!(
 853                        "remote io task died with error: {:?}. reconnecting...",
 854                        error
 855                    );
 856                    this.update(cx, |this, cx| {
 857                        this.reconnect(cx).ok();
 858                    })?;
 859                }
 860            }
 861
 862            Ok(())
 863        })
 864    }
 865
 866    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 867        self.state.as_ref().is_some_and(check)
 868    }
 869
 870    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 871        let new_state = self.state.as_ref().and_then(map);
 872        if let Some(new_state) = new_state {
 873            self.state.replace(new_state);
 874            cx.notify();
 875        }
 876    }
 877
 878    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 879        log::info!("setting state to '{}'", &state);
 880
 881        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 882        let is_server_not_running = state.is_server_not_running();
 883        self.state.replace(state);
 884
 885        if is_reconnect_exhausted || is_server_not_running {
 886            cx.emit(RemoteClientEvent::Disconnected {
 887                server_not_running: is_server_not_running,
 888            });
 889        }
 890        cx.notify();
 891    }
 892
 893    pub fn shell(&self) -> Option<String> {
 894        Some(self.remote_connection()?.shell())
 895    }
 896
 897    pub fn default_system_shell(&self) -> Option<String> {
 898        Some(self.remote_connection()?.default_system_shell())
 899    }
 900
 901    pub fn shares_network_interface(&self) -> bool {
 902        self.remote_connection()
 903            .map_or(false, |connection| connection.shares_network_interface())
 904    }
 905
 906    pub fn has_wsl_interop(&self) -> bool {
 907        self.remote_connection()
 908            .map_or(false, |connection| connection.has_wsl_interop())
 909    }
 910
 911    pub fn build_command(
 912        &self,
 913        program: Option<String>,
 914        args: &[String],
 915        env: &HashMap<String, String>,
 916        working_dir: Option<String>,
 917        port_forward: Option<(u16, String, u16)>,
 918    ) -> Result<CommandTemplate> {
 919        self.build_command_with_options(
 920            program,
 921            args,
 922            env,
 923            working_dir,
 924            port_forward,
 925            Interactive::Yes,
 926        )
 927    }
 928
 929    pub fn build_command_with_options(
 930        &self,
 931        program: Option<String>,
 932        args: &[String],
 933        env: &HashMap<String, String>,
 934        working_dir: Option<String>,
 935        port_forward: Option<(u16, String, u16)>,
 936        interactive: Interactive,
 937    ) -> Result<CommandTemplate> {
 938        let Some(connection) = self.remote_connection() else {
 939            return Err(anyhow!("no remote connection"));
 940        };
 941        connection.build_command(program, args, env, working_dir, port_forward, interactive)
 942    }
 943
 944    pub fn build_forward_ports_command(
 945        &self,
 946        forwards: Vec<(u16, String, u16)>,
 947    ) -> Result<CommandTemplate> {
 948        let Some(connection) = self.remote_connection() else {
 949            return Err(anyhow!("no remote connection"));
 950        };
 951        connection.build_forward_ports_command(forwards)
 952    }
 953
 954    pub fn upload_directory(
 955        &self,
 956        src_path: PathBuf,
 957        dest_path: RemotePathBuf,
 958        cx: &App,
 959    ) -> Task<Result<()>> {
 960        let Some(connection) = self.remote_connection() else {
 961            return Task::ready(Err(anyhow!("no remote connection")));
 962        };
 963        connection.upload_directory(src_path, dest_path, cx)
 964    }
 965
 966    pub fn proto_client(&self) -> AnyProtoClient {
 967        self.client.clone().into()
 968    }
 969
 970    pub fn connection_options(&self) -> RemoteConnectionOptions {
 971        self.connection_options.clone()
 972    }
 973
 974    pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 975        if let State::Connected {
 976            remote_connection, ..
 977        } = self.state.as_ref()?
 978        {
 979            Some(remote_connection.clone())
 980        } else {
 981            None
 982        }
 983    }
 984
 985    pub fn connection_state(&self) -> ConnectionState {
 986        self.state
 987            .as_ref()
 988            .map(ConnectionState::from)
 989            .unwrap_or(ConnectionState::Disconnected)
 990    }
 991
 992    pub fn is_disconnected(&self) -> bool {
 993        self.connection_state() == ConnectionState::Disconnected
 994    }
 995
 996    pub fn path_style(&self) -> PathStyle {
 997        self.path_style
 998    }
 999
1000    /// Forcibly disconnects from the remote server by killing the underlying connection.
1001    /// This will trigger the reconnection logic if reconnection attempts remain.
1002    /// Useful for testing reconnection behavior in real environments.
1003    pub fn force_disconnect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1004        let Some(connection) = self.remote_connection() else {
1005            return Task::ready(Err(anyhow!("no active remote connection to disconnect")));
1006        };
1007
1008        log::info!("force_disconnect: killing remote connection");
1009
1010        cx.spawn(async move |_, _| {
1011            connection.kill().await?;
1012            Ok(())
1013        })
1014    }
1015
1016    /// Simulates a timeout by pausing heartbeat responses.
1017    /// This will cause heartbeat failures and eventually trigger reconnection
1018    /// after MAX_MISSED_HEARTBEATS are missed.
1019    /// Useful for testing timeout behavior in real environments.
1020    pub fn force_heartbeat_timeout(&mut self, attempts: usize, cx: &mut Context<Self>) {
1021        log::info!("force_heartbeat_timeout: triggering heartbeat failure state");
1022
1023        if let Some(State::Connected {
1024            remote_connection,
1025            delegate,
1026            multiplex_task,
1027            heartbeat_task,
1028        }) = self.state.take()
1029        {
1030            self.set_state(
1031                if attempts == 0 {
1032                    State::HeartbeatMissed {
1033                        missed_heartbeats: MAX_MISSED_HEARTBEATS,
1034                        remote_connection,
1035                        delegate,
1036                        multiplex_task,
1037                        heartbeat_task,
1038                    }
1039                } else {
1040                    State::ReconnectFailed {
1041                        remote_connection,
1042                        delegate,
1043                        error: anyhow!("forced heartbeat timeout"),
1044                        attempts,
1045                    }
1046                },
1047                cx,
1048            );
1049
1050            self.reconnect(cx)
1051                .context("failed to start reconnect after forced timeout")
1052                .log_err();
1053        } else {
1054            log::warn!("force_heartbeat_timeout: not in Connected state, ignoring");
1055        }
1056    }
1057
1058    #[cfg(any(test, feature = "test-support"))]
1059    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
1060        let opts = self.connection_options();
1061        client_cx.spawn(async move |cx| {
1062            let connection = cx.update_global(|c: &mut ConnectionPool, _| {
1063                if let Some(ConnectionPoolEntry::Connected(c)) = c.connections.get(&opts) {
1064                    if let Some(connection) = c.upgrade() {
1065                        connection
1066                    } else {
1067                        panic!("connection was dropped")
1068                    }
1069                } else {
1070                    panic!("missing test connection")
1071                }
1072            });
1073
1074            connection.simulate_disconnect(cx);
1075        })
1076    }
1077
1078    /// Creates a mock connection pair for testing.
1079    ///
1080    /// This is the recommended way to create mock remote connections for tests.
1081    /// It returns the `MockConnectionOptions` (which can be passed to create a
1082    /// `HeadlessProject`), an `AnyProtoClient` for the server side and a
1083    /// `ConnectGuard` for the client side which blocks the connection from
1084    /// being established until dropped.
1085    ///
1086    /// # Example
1087    /// ```ignore
1088    /// let (opts, server_session, connect_guard) = RemoteClient::fake_server(cx, server_cx);
1089    /// // Set up HeadlessProject with server_session...
1090    /// drop(connect_guard);
1091    /// let client = RemoteClient::fake_client(opts, cx).await;
1092    /// ```
1093    #[cfg(any(test, feature = "test-support"))]
1094    pub fn fake_server(
1095        client_cx: &mut gpui::TestAppContext,
1096        server_cx: &mut gpui::TestAppContext,
1097    ) -> (RemoteConnectionOptions, AnyProtoClient, ConnectGuard) {
1098        use crate::transport::mock::MockConnection;
1099        let (opts, server_client, connect_guard) = MockConnection::new(client_cx, server_cx);
1100        (opts.into(), server_client, connect_guard)
1101    }
1102
1103    /// Creates a `RemoteClient` connected to a mock server.
1104    ///
1105    /// Call `fake_server` first to get the connection options, set up the
1106    /// `HeadlessProject` with the server session, then call this method
1107    /// to create the client.
1108    #[cfg(any(test, feature = "test-support"))]
1109    pub async fn connect_mock(
1110        opts: RemoteConnectionOptions,
1111        client_cx: &mut gpui::TestAppContext,
1112    ) -> Entity<Self> {
1113        assert!(matches!(opts, RemoteConnectionOptions::Mock(..)));
1114        use crate::transport::mock::MockDelegate;
1115        let (_tx, rx) = oneshot::channel();
1116        let mut cx = client_cx.to_async();
1117        let connection = connect(opts, Arc::new(MockDelegate), &mut cx)
1118            .await
1119            .unwrap();
1120        client_cx
1121            .update(|cx| {
1122                Self::new(
1123                    ConnectionIdentifier::setup(),
1124                    connection,
1125                    rx,
1126                    Arc::new(MockDelegate),
1127                    cx,
1128                )
1129            })
1130            .await
1131            .unwrap()
1132            .unwrap()
1133    }
1134
1135    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1136        self.state
1137            .as_ref()
1138            .and_then(|state| state.remote_connection())
1139    }
1140}
1141
1142enum ConnectionPoolEntry {
1143    Connecting(WeakShared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1144    Connected(Weak<dyn RemoteConnection>),
1145}
1146
1147#[derive(Default)]
1148struct ConnectionPool {
1149    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1150}
1151
1152impl Global for ConnectionPool {}
1153
1154impl ConnectionPool {
1155    fn connect(
1156        &mut self,
1157        opts: RemoteConnectionOptions,
1158        delegate: Arc<dyn RemoteClientDelegate>,
1159        cx: &mut App,
1160    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1161        let connection = self.connections.get(&opts);
1162        match connection {
1163            Some(ConnectionPoolEntry::Connecting(task)) => {
1164                if let Some(task) = task.upgrade() {
1165                    log::debug!("Connecting task is still alive");
1166                    cx.spawn(async move |cx| {
1167                        delegate.set_status(Some("Waiting for existing connection attempt"), cx)
1168                    })
1169                    .detach();
1170                    return task;
1171                }
1172                log::debug!("Connecting task is dead, removing it and restarting a connection");
1173                self.connections.remove(&opts);
1174            }
1175            Some(ConnectionPoolEntry::Connected(remote)) => {
1176                if let Some(remote) = remote.upgrade()
1177                    && !remote.has_been_killed()
1178                {
1179                    log::debug!("Connection is still alive");
1180                    return Task::ready(Ok(remote)).shared();
1181                }
1182                log::debug!("Connection is dead, removing it and restarting a connection");
1183                self.connections.remove(&opts);
1184            }
1185            None => {
1186                log::debug!("No existing connection found, starting a new one");
1187            }
1188        }
1189
1190        let task = cx
1191            .spawn({
1192                let opts = opts.clone();
1193                let delegate = delegate.clone();
1194                async move |cx| {
1195                    let connection = match opts.clone() {
1196                        RemoteConnectionOptions::Ssh(opts) => {
1197                            SshRemoteConnection::new(opts, delegate, cx)
1198                                .await
1199                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1200                        }
1201                        RemoteConnectionOptions::Wsl(opts) => {
1202                            WslRemoteConnection::new(opts, delegate, cx)
1203                                .await
1204                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1205                        }
1206                        RemoteConnectionOptions::Docker(opts) => {
1207                            DockerExecConnection::new(opts, delegate, cx)
1208                                .await
1209                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1210                        }
1211                        #[cfg(any(test, feature = "test-support"))]
1212                        RemoteConnectionOptions::Mock(opts) => match cx.update(|cx| {
1213                            cx.default_global::<crate::transport::mock::MockConnectionRegistry>()
1214                                .take(&opts)
1215                        }) {
1216                            Some(connection) => Ok(connection.await as Arc<dyn RemoteConnection>),
1217                            None => Err(anyhow!(
1218                                "Mock connection not found. Call MockConnection::new() first."
1219                            )),
1220                        },
1221                    };
1222
1223                    cx.update_global(|pool: &mut Self, _| {
1224                        debug_assert!(matches!(
1225                            pool.connections.get(&opts),
1226                            Some(ConnectionPoolEntry::Connecting(_))
1227                        ));
1228                        match connection {
1229                            Ok(connection) => {
1230                                pool.connections.insert(
1231                                    opts.clone(),
1232                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1233                                );
1234                                Ok(connection)
1235                            }
1236                            Err(error) => {
1237                                pool.connections.remove(&opts);
1238                                Err(Arc::new(error))
1239                            }
1240                        }
1241                    })
1242                }
1243            })
1244            .shared();
1245        if let Some(task) = task.downgrade() {
1246            self.connections
1247                .insert(opts.clone(), ConnectionPoolEntry::Connecting(task));
1248        }
1249        task
1250    }
1251}
1252
1253#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1254pub enum RemoteConnectionOptions {
1255    Ssh(SshConnectionOptions),
1256    Wsl(WslConnectionOptions),
1257    Docker(DockerConnectionOptions),
1258    #[cfg(any(test, feature = "test-support"))]
1259    Mock(crate::transport::mock::MockConnectionOptions),
1260}
1261
1262impl RemoteConnectionOptions {
1263    pub fn display_name(&self) -> String {
1264        match self {
1265            RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1266            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1267            RemoteConnectionOptions::Docker(opts) => {
1268                if opts.use_podman {
1269                    format!("[podman] {}", opts.name)
1270                } else {
1271                    opts.name.clone()
1272                }
1273            }
1274            #[cfg(any(test, feature = "test-support"))]
1275            RemoteConnectionOptions::Mock(opts) => format!("mock-{}", opts.id),
1276        }
1277    }
1278}
1279
1280impl From<SshConnectionOptions> for RemoteConnectionOptions {
1281    fn from(opts: SshConnectionOptions) -> Self {
1282        RemoteConnectionOptions::Ssh(opts)
1283    }
1284}
1285
1286impl From<WslConnectionOptions> for RemoteConnectionOptions {
1287    fn from(opts: WslConnectionOptions) -> Self {
1288        RemoteConnectionOptions::Wsl(opts)
1289    }
1290}
1291
1292#[cfg(any(test, feature = "test-support"))]
1293impl From<crate::transport::mock::MockConnectionOptions> for RemoteConnectionOptions {
1294    fn from(opts: crate::transport::mock::MockConnectionOptions) -> Self {
1295        RemoteConnectionOptions::Mock(opts)
1296    }
1297}
1298
1299#[cfg(target_os = "windows")]
1300/// Open a wsl path (\\wsl.localhost\<distro>\path)
1301#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1302#[action(namespace = workspace, no_json, no_register)]
1303pub struct OpenWslPath {
1304    pub distro: WslConnectionOptions,
1305    pub paths: Vec<PathBuf>,
1306}
1307
1308#[async_trait(?Send)]
1309pub trait RemoteConnection: Send + Sync {
1310    fn start_proxy(
1311        &self,
1312        unique_identifier: String,
1313        reconnect: bool,
1314        incoming_tx: UnboundedSender<Envelope>,
1315        outgoing_rx: UnboundedReceiver<Envelope>,
1316        connection_activity_tx: Sender<()>,
1317        delegate: Arc<dyn RemoteClientDelegate>,
1318        cx: &mut AsyncApp,
1319    ) -> Task<Result<i32>>;
1320    fn upload_directory(
1321        &self,
1322        src_path: PathBuf,
1323        dest_path: RemotePathBuf,
1324        cx: &App,
1325    ) -> Task<Result<()>>;
1326    async fn kill(&self) -> Result<()>;
1327    fn has_been_killed(&self) -> bool;
1328    fn shares_network_interface(&self) -> bool {
1329        false
1330    }
1331    fn build_command(
1332        &self,
1333        program: Option<String>,
1334        args: &[String],
1335        env: &HashMap<String, String>,
1336        working_dir: Option<String>,
1337        port_forward: Option<(u16, String, u16)>,
1338        interactive: Interactive,
1339    ) -> Result<CommandTemplate>;
1340    fn build_forward_ports_command(
1341        &self,
1342        forwards: Vec<(u16, String, u16)>,
1343    ) -> Result<CommandTemplate>;
1344    fn connection_options(&self) -> RemoteConnectionOptions;
1345    fn path_style(&self) -> PathStyle;
1346    fn shell(&self) -> String;
1347    fn default_system_shell(&self) -> String;
1348    fn has_wsl_interop(&self) -> bool;
1349
1350    #[cfg(any(test, feature = "test-support"))]
1351    fn simulate_disconnect(&self, _: &AsyncApp) {}
1352}
1353
1354type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1355
1356struct Signal<T> {
1357    tx: Mutex<Option<oneshot::Sender<T>>>,
1358    rx: Shared<Task<Option<T>>>,
1359}
1360
1361impl<T: Send + Clone + 'static> Signal<T> {
1362    pub fn new(cx: &App) -> Self {
1363        let (tx, rx) = oneshot::channel();
1364
1365        let task = cx
1366            .background_executor()
1367            .spawn(async move { rx.await.ok() })
1368            .shared();
1369
1370        Self {
1371            tx: Mutex::new(Some(tx)),
1372            rx: task,
1373        }
1374    }
1375
1376    fn set(&self, value: T) {
1377        if let Some(tx) = self.tx.lock().take() {
1378            let _ = tx.send(value);
1379        }
1380    }
1381
1382    fn wait(&self) -> Shared<Task<Option<T>>> {
1383        self.rx.clone()
1384    }
1385}
1386
1387pub(crate) struct ChannelClient {
1388    next_message_id: AtomicU32,
1389    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1390    buffer: Mutex<VecDeque<Envelope>>,
1391    response_channels: ResponseChannels,
1392    message_handlers: Mutex<ProtoMessageHandlerSet>,
1393    max_received: AtomicU32,
1394    name: &'static str,
1395    task: Mutex<Task<Result<()>>>,
1396    remote_started: Signal<()>,
1397    has_wsl_interop: bool,
1398    executor: BackgroundExecutor,
1399}
1400
1401impl ChannelClient {
1402    pub(crate) fn new(
1403        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1404        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1405        cx: &App,
1406        name: &'static str,
1407        has_wsl_interop: bool,
1408    ) -> Arc<Self> {
1409        Arc::new_cyclic(|this| Self {
1410            outgoing_tx: Mutex::new(outgoing_tx),
1411            next_message_id: AtomicU32::new(0),
1412            max_received: AtomicU32::new(0),
1413            response_channels: ResponseChannels::default(),
1414            message_handlers: Default::default(),
1415            buffer: Mutex::new(VecDeque::new()),
1416            name,
1417            executor: cx.background_executor().clone(),
1418            task: Mutex::new(Self::start_handling_messages(
1419                this.clone(),
1420                incoming_rx,
1421                &cx.to_async(),
1422            )),
1423            remote_started: Signal::new(cx),
1424            has_wsl_interop,
1425        })
1426    }
1427
1428    fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1429        self.remote_started.wait()
1430    }
1431
1432    fn start_handling_messages(
1433        this: Weak<Self>,
1434        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1435        cx: &AsyncApp,
1436    ) -> Task<Result<()>> {
1437        cx.spawn(async move |cx| {
1438            if let Some(this) = this.upgrade() {
1439                let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1440                this.outgoing_tx.lock().unbounded_send(envelope).ok();
1441            };
1442
1443            let peer_id = PeerId { owner_id: 0, id: 0 };
1444            while let Some(incoming) = incoming_rx.next().await {
1445                let Some(this) = this.upgrade() else {
1446                    return anyhow::Ok(());
1447                };
1448                if let Some(ack_id) = incoming.ack_id {
1449                    let mut buffer = this.buffer.lock();
1450                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1451                        buffer.pop_front();
1452                    }
1453                }
1454                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1455                {
1456                    log::debug!(
1457                        "{}:remote message received. name:FlushBufferedMessages",
1458                        this.name
1459                    );
1460                    {
1461                        let buffer = this.buffer.lock();
1462                        for envelope in buffer.iter() {
1463                            this.outgoing_tx
1464                                .lock()
1465                                .unbounded_send(envelope.clone())
1466                                .ok();
1467                        }
1468                    }
1469                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1470                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1471                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1472                    continue;
1473                }
1474
1475                if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1476                    this.remote_started.set(());
1477                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1478                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1479                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1480                    continue;
1481                }
1482
1483                this.max_received.store(incoming.id, SeqCst);
1484
1485                if let Some(request_id) = incoming.responding_to {
1486                    let request_id = MessageId(request_id);
1487                    let sender = this.response_channels.lock().remove(&request_id);
1488                    if let Some(sender) = sender {
1489                        let (tx, rx) = oneshot::channel();
1490                        if incoming.payload.is_some() {
1491                            sender.send((incoming, tx)).ok();
1492                        }
1493                        rx.await.ok();
1494                    }
1495                } else if let Some(envelope) =
1496                    build_typed_envelope(peer_id, Instant::now(), incoming)
1497                {
1498                    let type_name = envelope.payload_type_name();
1499                    let message_id = envelope.message_id();
1500                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1501                        &this.message_handlers,
1502                        envelope,
1503                        this.clone().into(),
1504                        cx.clone(),
1505                    ) {
1506                        log::debug!("{}:remote message received. name:{type_name}", this.name);
1507                        cx.foreground_executor()
1508                            .spawn(async move {
1509                                match future.await {
1510                                    Ok(_) => {
1511                                        log::debug!(
1512                                            "{}:remote message handled. name:{type_name}",
1513                                            this.name
1514                                        );
1515                                    }
1516                                    Err(error) => {
1517                                        log::error!(
1518                                            "{}:error handling message. type:{}, error:{:#}",
1519                                            this.name,
1520                                            type_name,
1521                                            format!("{error:#}").lines().fold(
1522                                                String::new(),
1523                                                |mut message, line| {
1524                                                    if !message.is_empty() {
1525                                                        message.push(' ');
1526                                                    }
1527                                                    message.push_str(line);
1528                                                    message
1529                                                }
1530                                            )
1531                                        );
1532                                    }
1533                                }
1534                            })
1535                            .detach()
1536                    } else {
1537                        log::error!("{}:unhandled remote message name:{type_name}", this.name);
1538                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1539                            message_id,
1540                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1541                        ) {
1542                            log::error!(
1543                                "{}:error sending error response for {type_name}:{e:#}",
1544                                this.name
1545                            );
1546                        }
1547                    }
1548                }
1549            }
1550            anyhow::Ok(())
1551        })
1552    }
1553
1554    pub(crate) fn reconnect(
1555        self: &Arc<Self>,
1556        incoming_rx: UnboundedReceiver<Envelope>,
1557        outgoing_tx: UnboundedSender<Envelope>,
1558        cx: &AsyncApp,
1559    ) {
1560        *self.outgoing_tx.lock() = outgoing_tx;
1561        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1562    }
1563
1564    fn request<T: RequestMessage>(
1565        &self,
1566        payload: T,
1567    ) -> impl 'static + Future<Output = Result<T::Response>> {
1568        self.request_internal(payload, true)
1569    }
1570
1571    fn request_internal<T: RequestMessage>(
1572        &self,
1573        payload: T,
1574        use_buffer: bool,
1575    ) -> impl 'static + Future<Output = Result<T::Response>> {
1576        log::debug!("remote request start. name:{}", T::NAME);
1577        let response =
1578            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1579        async move {
1580            let response = response.await?;
1581            log::debug!("remote request finish. name:{}", T::NAME);
1582            T::Response::from_envelope(response).context("received a response of the wrong type")
1583        }
1584    }
1585
1586    async fn resync(&self, timeout: Duration) -> Result<()> {
1587        smol::future::or(
1588            async {
1589                self.request_internal(proto::FlushBufferedMessages {}, false)
1590                    .await?;
1591
1592                for envelope in self.buffer.lock().iter() {
1593                    self.outgoing_tx
1594                        .lock()
1595                        .unbounded_send(envelope.clone())
1596                        .ok();
1597                }
1598                Ok(())
1599            },
1600            async {
1601                self.executor.timer(timeout).await;
1602                anyhow::bail!("Timed out resyncing remote client")
1603            },
1604        )
1605        .await
1606    }
1607
1608    async fn ping(&self, timeout: Duration) -> Result<()> {
1609        smol::future::or(
1610            async {
1611                self.request(proto::Ping {}).await?;
1612                Ok(())
1613            },
1614            async {
1615                self.executor.timer(timeout).await;
1616                anyhow::bail!("Timed out pinging remote client")
1617            },
1618        )
1619        .await
1620    }
1621
1622    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1623        log::debug!("remote send name:{}", T::NAME);
1624        self.send_dynamic(payload.into_envelope(0, None, None))
1625    }
1626
1627    fn request_dynamic(
1628        &self,
1629        mut envelope: proto::Envelope,
1630        type_name: &'static str,
1631        use_buffer: bool,
1632    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1633        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1634        let (tx, rx) = oneshot::channel();
1635        let mut response_channels_lock = self.response_channels.lock();
1636        response_channels_lock.insert(MessageId(envelope.id), tx);
1637        drop(response_channels_lock);
1638
1639        let result = if use_buffer {
1640            self.send_buffered(envelope)
1641        } else {
1642            self.send_unbuffered(envelope)
1643        };
1644        async move {
1645            if let Err(error) = &result {
1646                log::error!("failed to send message: {error}");
1647                anyhow::bail!("failed to send message: {error}");
1648            }
1649
1650            let response = rx.await.context("connection lost")?.0;
1651            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1652                return Err(RpcError::from_proto(error, type_name));
1653            }
1654            Ok(response)
1655        }
1656    }
1657
1658    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1659        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1660        self.send_buffered(envelope)
1661    }
1662
1663    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1664        envelope.ack_id = Some(self.max_received.load(SeqCst));
1665        self.buffer.lock().push_back(envelope.clone());
1666        // ignore errors on send (happen while we're reconnecting)
1667        // assume that the global "disconnected" overlay is sufficient.
1668        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1669        Ok(())
1670    }
1671
1672    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1673        envelope.ack_id = Some(self.max_received.load(SeqCst));
1674        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1675        Ok(())
1676    }
1677}
1678
1679impl ProtoClient for ChannelClient {
1680    fn request(
1681        &self,
1682        envelope: proto::Envelope,
1683        request_type: &'static str,
1684    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1685        self.request_dynamic(envelope, request_type, true).boxed()
1686    }
1687
1688    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1689        self.send_dynamic(envelope)
1690    }
1691
1692    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1693        self.send_dynamic(envelope)
1694    }
1695
1696    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1697        &self.message_handlers
1698    }
1699
1700    fn is_via_collab(&self) -> bool {
1701        false
1702    }
1703
1704    fn has_wsl_interop(&self) -> bool {
1705        self.has_wsl_interop
1706    }
1707}