remote_client.rs

   1#[cfg(any(test, feature = "test-support"))]
   2use crate::transport::mock::ConnectGuard;
   3use crate::{
   4    SshConnectionOptions,
   5    protocol::MessageId,
   6    proxy::ProxyLaunchError,
   7    transport::{
   8        docker::{DockerConnectionOptions, DockerExecConnection},
   9        ssh::SshRemoteConnection,
  10        wsl::{WslConnectionOptions, WslRemoteConnection},
  11    },
  12};
  13use anyhow::{Context as _, Result, anyhow};
  14use askpass::EncryptedPassword;
  15use async_trait::async_trait;
  16use collections::HashMap;
  17use futures::{
  18    Future, FutureExt as _, StreamExt as _,
  19    channel::{
  20        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  21        oneshot,
  22    },
  23    future::{BoxFuture, Shared},
  24    select, select_biased,
  25};
  26use gpui::{
  27    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  28    EventEmitter, FutureExt, Global, Task, WeakEntity,
  29};
  30use parking_lot::Mutex;
  31
  32use release_channel::ReleaseChannel;
  33use rpc::{
  34    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  35    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  36};
  37use semver::Version;
  38use std::{
  39    collections::VecDeque,
  40    fmt,
  41    ops::ControlFlow,
  42    path::PathBuf,
  43    sync::{
  44        Arc, Weak,
  45        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  46    },
  47    time::{Duration, Instant},
  48};
  49use util::{
  50    ResultExt,
  51    paths::{PathStyle, RemotePathBuf},
  52};
  53
  54#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  55pub enum RemoteOs {
  56    Linux,
  57    MacOs,
  58    Windows,
  59}
  60
  61impl RemoteOs {
  62    pub fn as_str(&self) -> &'static str {
  63        match self {
  64            RemoteOs::Linux => "linux",
  65            RemoteOs::MacOs => "macos",
  66            RemoteOs::Windows => "windows",
  67        }
  68    }
  69
  70    pub fn is_windows(&self) -> bool {
  71        matches!(self, RemoteOs::Windows)
  72    }
  73}
  74
  75impl std::fmt::Display for RemoteOs {
  76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  77        f.write_str(self.as_str())
  78    }
  79}
  80
  81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  82pub enum RemoteArch {
  83    X86_64,
  84    Aarch64,
  85}
  86
  87impl RemoteArch {
  88    pub fn as_str(&self) -> &'static str {
  89        match self {
  90            RemoteArch::X86_64 => "x86_64",
  91            RemoteArch::Aarch64 => "aarch64",
  92        }
  93    }
  94}
  95
  96impl std::fmt::Display for RemoteArch {
  97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  98        f.write_str(self.as_str())
  99    }
 100}
 101
 102#[derive(Copy, Clone, Debug)]
 103pub struct RemotePlatform {
 104    pub os: RemoteOs,
 105    pub arch: RemoteArch,
 106}
 107
 108#[derive(Clone, Debug)]
 109pub struct CommandTemplate {
 110    pub program: String,
 111    pub args: Vec<String>,
 112    pub env: HashMap<String, String>,
 113}
 114
 115pub trait RemoteClientDelegate: Send + Sync {
 116    fn ask_password(
 117        &self,
 118        prompt: String,
 119        tx: oneshot::Sender<EncryptedPassword>,
 120        cx: &mut AsyncApp,
 121    );
 122    fn get_download_url(
 123        &self,
 124        platform: RemotePlatform,
 125        release_channel: ReleaseChannel,
 126        version: Option<Version>,
 127        cx: &mut AsyncApp,
 128    ) -> Task<Result<Option<String>>>;
 129    fn download_server_binary_locally(
 130        &self,
 131        platform: RemotePlatform,
 132        release_channel: ReleaseChannel,
 133        version: Option<Version>,
 134        cx: &mut AsyncApp,
 135    ) -> Task<Result<PathBuf>>;
 136    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
 137}
 138
 139const MAX_MISSED_HEARTBEATS: usize = 5;
 140const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
 141const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
 142const INITIAL_CONNECTION_TIMEOUT: Duration =
 143    Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
 144
 145pub const MAX_RECONNECT_ATTEMPTS: usize = 3;
 146
 147enum State {
 148    Connecting,
 149    Connected {
 150        remote_connection: Arc<dyn RemoteConnection>,
 151        delegate: Arc<dyn RemoteClientDelegate>,
 152
 153        multiplex_task: Task<Result<()>>,
 154        heartbeat_task: Task<Result<()>>,
 155    },
 156    HeartbeatMissed {
 157        missed_heartbeats: usize,
 158
 159        remote_connection: Arc<dyn RemoteConnection>,
 160        delegate: Arc<dyn RemoteClientDelegate>,
 161
 162        multiplex_task: Task<Result<()>>,
 163        heartbeat_task: Task<Result<()>>,
 164    },
 165    Reconnecting,
 166    ReconnectFailed {
 167        remote_connection: Arc<dyn RemoteConnection>,
 168        delegate: Arc<dyn RemoteClientDelegate>,
 169
 170        error: anyhow::Error,
 171        attempts: usize,
 172    },
 173    ReconnectExhausted,
 174    ServerNotRunning,
 175}
 176
 177impl fmt::Display for State {
 178    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 179        match self {
 180            Self::Connecting => write!(f, "connecting"),
 181            Self::Connected { .. } => write!(f, "connected"),
 182            Self::Reconnecting => write!(f, "reconnecting"),
 183            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 184            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 185            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 186            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 187        }
 188    }
 189}
 190
 191impl State {
 192    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 193        match self {
 194            Self::Connected {
 195                remote_connection, ..
 196            } => Some(remote_connection.clone()),
 197            Self::HeartbeatMissed {
 198                remote_connection, ..
 199            } => Some(remote_connection.clone()),
 200            Self::ReconnectFailed {
 201                remote_connection, ..
 202            } => Some(remote_connection.clone()),
 203            _ => None,
 204        }
 205    }
 206
 207    fn can_reconnect(&self) -> bool {
 208        match self {
 209            Self::Connected { .. }
 210            | Self::HeartbeatMissed { .. }
 211            | Self::ReconnectFailed { .. } => true,
 212            State::Connecting
 213            | State::Reconnecting
 214            | State::ReconnectExhausted
 215            | State::ServerNotRunning => false,
 216        }
 217    }
 218
 219    fn is_reconnect_failed(&self) -> bool {
 220        matches!(self, Self::ReconnectFailed { .. })
 221    }
 222
 223    fn is_reconnect_exhausted(&self) -> bool {
 224        matches!(self, Self::ReconnectExhausted { .. })
 225    }
 226
 227    fn is_server_not_running(&self) -> bool {
 228        matches!(self, Self::ServerNotRunning)
 229    }
 230
 231    fn is_reconnecting(&self) -> bool {
 232        matches!(self, Self::Reconnecting { .. })
 233    }
 234
 235    fn heartbeat_recovered(self) -> Self {
 236        match self {
 237            Self::HeartbeatMissed {
 238                remote_connection,
 239                delegate,
 240                multiplex_task,
 241                heartbeat_task,
 242                ..
 243            } => Self::Connected {
 244                remote_connection,
 245                delegate,
 246                multiplex_task,
 247                heartbeat_task,
 248            },
 249            _ => self,
 250        }
 251    }
 252
 253    fn heartbeat_missed(self) -> Self {
 254        match self {
 255            Self::Connected {
 256                remote_connection,
 257                delegate,
 258                multiplex_task,
 259                heartbeat_task,
 260            } => Self::HeartbeatMissed {
 261                missed_heartbeats: 1,
 262                remote_connection,
 263                delegate,
 264                multiplex_task,
 265                heartbeat_task,
 266            },
 267            Self::HeartbeatMissed {
 268                missed_heartbeats,
 269                remote_connection,
 270                delegate,
 271                multiplex_task,
 272                heartbeat_task,
 273            } => Self::HeartbeatMissed {
 274                missed_heartbeats: missed_heartbeats + 1,
 275                remote_connection,
 276                delegate,
 277                multiplex_task,
 278                heartbeat_task,
 279            },
 280            _ => self,
 281        }
 282    }
 283}
 284
 285/// The state of the ssh connection.
 286#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 287pub enum ConnectionState {
 288    Connecting,
 289    Connected,
 290    HeartbeatMissed,
 291    Reconnecting,
 292    Disconnected,
 293}
 294
 295impl From<&State> for ConnectionState {
 296    fn from(value: &State) -> Self {
 297        match value {
 298            State::Connecting => Self::Connecting,
 299            State::Connected { .. } => Self::Connected,
 300            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 301            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 302            State::ReconnectExhausted => Self::Disconnected,
 303            State::ServerNotRunning => Self::Disconnected,
 304        }
 305    }
 306}
 307
 308pub struct RemoteClient {
 309    client: Arc<ChannelClient>,
 310    unique_identifier: String,
 311    connection_options: RemoteConnectionOptions,
 312    path_style: PathStyle,
 313    state: Option<State>,
 314}
 315
 316#[derive(Debug)]
 317pub enum RemoteClientEvent {
 318    Disconnected,
 319}
 320
 321impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 322
 323/// Identifies the socket on the remote server so that reconnects
 324/// can re-join the same project.
 325pub enum ConnectionIdentifier {
 326    Setup(u64),
 327    Workspace(i64),
 328}
 329
 330static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 331
 332impl ConnectionIdentifier {
 333    pub fn setup() -> Self {
 334        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 335    }
 336
 337    // This string gets used in a socket name, and so must be relatively short.
 338    // The total length of:
 339    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 340    // Must be less than about 100 characters
 341    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 342    // So our strings should be at most 20 characters or so.
 343    fn to_string(&self, cx: &App) -> String {
 344        let identifier_prefix = match ReleaseChannel::global(cx) {
 345            ReleaseChannel::Stable => "".to_string(),
 346            release_channel => format!("{}-", release_channel.dev_name()),
 347        };
 348        match self {
 349            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 350            Self::Workspace(workspace_id) => {
 351                format!("{identifier_prefix}workspace-{workspace_id}",)
 352            }
 353        }
 354    }
 355}
 356
 357pub async fn connect(
 358    connection_options: RemoteConnectionOptions,
 359    delegate: Arc<dyn RemoteClientDelegate>,
 360    cx: &mut AsyncApp,
 361) -> Result<Arc<dyn RemoteConnection>> {
 362    cx.update(|cx| {
 363        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 364            pool.connect(connection_options.clone(), delegate.clone(), cx)
 365        })
 366    })
 367    .await
 368    .map_err(|e| e.cloned())
 369}
 370
 371impl RemoteClient {
 372    pub fn new(
 373        unique_identifier: ConnectionIdentifier,
 374        remote_connection: Arc<dyn RemoteConnection>,
 375        cancellation: oneshot::Receiver<()>,
 376        delegate: Arc<dyn RemoteClientDelegate>,
 377        cx: &mut App,
 378    ) -> Task<Result<Option<Entity<Self>>>> {
 379        let unique_identifier = unique_identifier.to_string(cx);
 380        cx.spawn(async move |cx| {
 381            let success = Box::pin(async move {
 382                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 383                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 384                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 385
 386                let client = cx.update(|cx| {
 387                    ChannelClient::new(
 388                        incoming_rx,
 389                        outgoing_tx,
 390                        cx,
 391                        "client",
 392                        remote_connection.has_wsl_interop(),
 393                    )
 394                });
 395
 396                let path_style = remote_connection.path_style();
 397                let this = cx.new(|_| Self {
 398                    client: client.clone(),
 399                    unique_identifier: unique_identifier.clone(),
 400                    connection_options: remote_connection.connection_options(),
 401                    path_style,
 402                    state: Some(State::Connecting),
 403                });
 404
 405                let io_task = remote_connection.start_proxy(
 406                    unique_identifier,
 407                    false,
 408                    incoming_tx,
 409                    outgoing_rx,
 410                    connection_activity_tx,
 411                    delegate.clone(),
 412                    cx,
 413                );
 414
 415                let ready = client
 416                    .wait_for_remote_started()
 417                    .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
 418                    .await;
 419                match ready {
 420                    Ok(Some(_)) => {}
 421                    Ok(None) => {
 422                        let mut error = "remote client exited before becoming ready".to_owned();
 423                        if let Some(status) = io_task.now_or_never() {
 424                            match status {
 425                                Ok(exit_code) => {
 426                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 427                                }
 428                                Err(e) => error.push_str(&format!(", error={e:?}")),
 429                            }
 430                        }
 431                        let error = anyhow::anyhow!("{error}");
 432                        log::error!("failed to establish connection: {}", error);
 433                        return Err(error);
 434                    }
 435                    Err(_) => {
 436                        let mut error =
 437                            "remote client did not become ready within the timeout".to_owned();
 438                        if let Some(status) = io_task.now_or_never() {
 439                            match status {
 440                                Ok(exit_code) => {
 441                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 442                                }
 443                                Err(e) => error.push_str(&format!(", error={e:?}")),
 444                            }
 445                        }
 446                        let error = anyhow::anyhow!("{error}");
 447                        log::error!("failed to establish connection: {}", error);
 448                        return Err(error);
 449                    }
 450                }
 451                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 452                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 453                    log::error!("failed to establish connection: {}", error);
 454                    return Err(error);
 455                }
 456
 457                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 458
 459                this.update(cx, |this, _| {
 460                    this.state = Some(State::Connected {
 461                        remote_connection,
 462                        delegate,
 463                        multiplex_task,
 464                        heartbeat_task,
 465                    });
 466                });
 467
 468                Ok(Some(this))
 469            });
 470
 471            select! {
 472                _ = cancellation.fuse() => {
 473                    Ok(None)
 474                }
 475                result = success.fuse() =>  result
 476            }
 477        })
 478    }
 479
 480    pub fn proto_client_from_channels(
 481        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 482        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 483        cx: &App,
 484        name: &'static str,
 485        has_wsl_interop: bool,
 486    ) -> AnyProtoClient {
 487        ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
 488    }
 489
 490    pub fn shutdown_processes<T: RequestMessage>(
 491        &mut self,
 492        shutdown_request: Option<T>,
 493        executor: BackgroundExecutor,
 494    ) -> Option<impl Future<Output = ()> + use<T>> {
 495        let state = self.state.take()?;
 496        log::info!("shutting down remote processes");
 497
 498        let State::Connected {
 499            multiplex_task,
 500            heartbeat_task,
 501            remote_connection,
 502            delegate,
 503        } = state
 504        else {
 505            return None;
 506        };
 507
 508        let client = self.client.clone();
 509
 510        Some(async move {
 511            if let Some(shutdown_request) = shutdown_request {
 512                client.send(shutdown_request).log_err();
 513                // We wait 50ms instead of waiting for a response, because
 514                // waiting for a response would require us to wait on the main thread
 515                // which we want to avoid in an `on_app_quit` callback.
 516                executor.timer(Duration::from_millis(50)).await;
 517            }
 518
 519            // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
 520            // child of master_process.
 521            drop(multiplex_task);
 522            // Now drop the rest of state, which kills master process.
 523            drop(heartbeat_task);
 524            drop(remote_connection);
 525            drop(delegate);
 526        })
 527    }
 528
 529    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 530        let can_reconnect = self
 531            .state
 532            .as_ref()
 533            .map(|state| state.can_reconnect())
 534            .unwrap_or(false);
 535        if !can_reconnect {
 536            let state = if let Some(state) = self.state.as_ref() {
 537                state.to_string()
 538            } else {
 539                "no state set".to_string()
 540            };
 541            log::info!(
 542                "aborting reconnect, because not in state that allows reconnecting: {state}"
 543            );
 544            anyhow::bail!(
 545                "aborting reconnect, because not in state that allows reconnecting: {state}"
 546            );
 547        }
 548
 549        let state = self.state.take().unwrap();
 550        let (attempts, remote_connection, delegate) = match state {
 551            State::Connected {
 552                remote_connection,
 553                delegate,
 554                multiplex_task,
 555                heartbeat_task,
 556            }
 557            | State::HeartbeatMissed {
 558                remote_connection,
 559                delegate,
 560                multiplex_task,
 561                heartbeat_task,
 562                ..
 563            } => {
 564                drop(multiplex_task);
 565                drop(heartbeat_task);
 566                (0, remote_connection, delegate)
 567            }
 568            State::ReconnectFailed {
 569                attempts,
 570                remote_connection,
 571                delegate,
 572                ..
 573            } => (attempts, remote_connection, delegate),
 574            State::Connecting
 575            | State::Reconnecting
 576            | State::ReconnectExhausted
 577            | State::ServerNotRunning => unreachable!(),
 578        };
 579
 580        let attempts = attempts + 1;
 581        if attempts > MAX_RECONNECT_ATTEMPTS {
 582            log::error!(
 583                "Failed to reconnect to after {} attempts, giving up",
 584                MAX_RECONNECT_ATTEMPTS
 585            );
 586            self.set_state(State::ReconnectExhausted, cx);
 587            return Ok(());
 588        }
 589
 590        self.set_state(State::Reconnecting, cx);
 591
 592        log::info!(
 593            "Trying to reconnect to remote server... Attempt {}",
 594            attempts
 595        );
 596
 597        let unique_identifier = self.unique_identifier.clone();
 598        let client = self.client.clone();
 599        let reconnect_task = cx.spawn(async move |this, cx| {
 600            macro_rules! failed {
 601                ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
 602                    delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
 603                    return State::ReconnectFailed {
 604                        error: anyhow!($error),
 605                        attempts: $attempts,
 606                        remote_connection: $remote_connection,
 607                        delegate: $delegate,
 608                    };
 609                };
 610            }
 611
 612            if let Err(error) = remote_connection
 613                .kill()
 614                .await
 615                .context("Failed to kill remote_connection process")
 616            {
 617                failed!(error, attempts, remote_connection, delegate);
 618            };
 619
 620            let connection_options = remote_connection.connection_options();
 621
 622            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 623            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 624            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 625
 626            let (remote_connection, io_task) = match async {
 627                let remote_connection = cx
 628                    .update_global(|pool: &mut ConnectionPool, cx| {
 629                        pool.connect(connection_options, delegate.clone(), cx)
 630                    })
 631                    .await
 632                    .map_err(|error| error.cloned())?;
 633
 634                let io_task = remote_connection.start_proxy(
 635                    unique_identifier,
 636                    true,
 637                    incoming_tx,
 638                    outgoing_rx,
 639                    connection_activity_tx,
 640                    delegate.clone(),
 641                    cx,
 642                );
 643                anyhow::Ok((remote_connection, io_task))
 644            }
 645            .await
 646            {
 647                Ok((remote_connection, io_task)) => (remote_connection, io_task),
 648                Err(error) => {
 649                    failed!(error, attempts, remote_connection, delegate);
 650                }
 651            };
 652
 653            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 654            client.reconnect(incoming_rx, outgoing_tx, cx);
 655
 656            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 657                failed!(error, attempts, remote_connection, delegate);
 658            };
 659
 660            State::Connected {
 661                remote_connection,
 662                delegate,
 663                multiplex_task,
 664                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 665            }
 666        });
 667
 668        cx.spawn(async move |this, cx| {
 669            let new_state = reconnect_task.await;
 670            this.update(cx, |this, cx| {
 671                this.try_set_state(cx, |old_state| {
 672                    if old_state.is_reconnecting() {
 673                        match &new_state {
 674                            State::Connecting
 675                            | State::Reconnecting
 676                            | State::HeartbeatMissed { .. }
 677                            | State::ServerNotRunning => {}
 678                            State::Connected { .. } => {
 679                                log::info!("Successfully reconnected");
 680                            }
 681                            State::ReconnectFailed {
 682                                error, attempts, ..
 683                            } => {
 684                                log::error!(
 685                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 686                                    attempts,
 687                                    error
 688                                );
 689                            }
 690                            State::ReconnectExhausted => {
 691                                log::error!("Reconnect attempt failed and all attempts exhausted");
 692                            }
 693                        }
 694                        Some(new_state)
 695                    } else {
 696                        None
 697                    }
 698                });
 699
 700                if this.state_is(State::is_reconnect_failed) {
 701                    this.reconnect(cx)
 702                } else if this.state_is(State::is_reconnect_exhausted) {
 703                    Ok(())
 704                } else {
 705                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 706                    Ok(())
 707                }
 708            })
 709        })
 710        .detach_and_log_err(cx);
 711
 712        Ok(())
 713    }
 714
 715    fn heartbeat(
 716        this: WeakEntity<Self>,
 717        mut connection_activity_rx: mpsc::Receiver<()>,
 718        cx: &mut AsyncApp,
 719    ) -> Task<Result<()>> {
 720        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 721            return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
 722        };
 723
 724        cx.spawn(async move |cx| {
 725            let mut missed_heartbeats = 0;
 726
 727            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 728            futures::pin_mut!(keepalive_timer);
 729
 730            loop {
 731                select_biased! {
 732                    result = connection_activity_rx.next().fuse() => {
 733                        if result.is_none() {
 734                            log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
 735                            return Ok(());
 736                        }
 737
 738                        if missed_heartbeats != 0 {
 739                            missed_heartbeats = 0;
 740                            let _ =this.update(cx, |this, cx| {
 741                                this.handle_heartbeat_result(missed_heartbeats, cx)
 742                            })?;
 743                        }
 744                    }
 745                    _ = keepalive_timer => {
 746                        log::debug!("Sending heartbeat to server...");
 747
 748                        let result = select_biased! {
 749                            _ = connection_activity_rx.next().fuse() => {
 750                                Ok(())
 751                            }
 752                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 753                                ping_result
 754                            }
 755                        };
 756
 757                        if result.is_err() {
 758                            missed_heartbeats += 1;
 759                            log::warn!(
 760                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 761                                HEARTBEAT_TIMEOUT,
 762                                missed_heartbeats,
 763                                MAX_MISSED_HEARTBEATS
 764                            );
 765                        } else if missed_heartbeats != 0 {
 766                            missed_heartbeats = 0;
 767                        } else {
 768                            continue;
 769                        }
 770
 771                        let result = this.update(cx, |this, cx| {
 772                            this.handle_heartbeat_result(missed_heartbeats, cx)
 773                        })?;
 774                        if result.is_break() {
 775                            return Ok(());
 776                        }
 777                    }
 778                }
 779
 780                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 781            }
 782        })
 783    }
 784
 785    fn handle_heartbeat_result(
 786        &mut self,
 787        missed_heartbeats: usize,
 788        cx: &mut Context<Self>,
 789    ) -> ControlFlow<()> {
 790        let state = self.state.take().unwrap();
 791        let next_state = if missed_heartbeats > 0 {
 792            state.heartbeat_missed()
 793        } else {
 794            state.heartbeat_recovered()
 795        };
 796
 797        self.set_state(next_state, cx);
 798
 799        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 800            log::error!(
 801                "Missed last {} heartbeats. Reconnecting...",
 802                missed_heartbeats
 803            );
 804
 805            self.reconnect(cx)
 806                .context("failed to start reconnect process after missing heartbeats")
 807                .log_err();
 808            ControlFlow::Break(())
 809        } else {
 810            ControlFlow::Continue(())
 811        }
 812    }
 813
 814    fn monitor(
 815        this: WeakEntity<Self>,
 816        io_task: Task<Result<i32>>,
 817        cx: &AsyncApp,
 818    ) -> Task<Result<()>> {
 819        cx.spawn(async move |cx| {
 820            let result = io_task.await;
 821
 822            match result {
 823                Ok(exit_code) => {
 824                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 825                        match error {
 826                            ProxyLaunchError::ServerNotRunning => {
 827                                log::error!("failed to reconnect because server is not running");
 828                                this.update(cx, |this, cx| {
 829                                    this.set_state(State::ServerNotRunning, cx);
 830                                })?;
 831                            }
 832                        }
 833                    } else if exit_code > 0 {
 834                        log::error!("proxy process terminated unexpectedly");
 835                        this.update(cx, |this, cx| {
 836                            this.reconnect(cx).ok();
 837                        })?;
 838                    }
 839                }
 840                Err(error) => {
 841                    log::warn!(
 842                        "remote io task died with error: {:?}. reconnecting...",
 843                        error
 844                    );
 845                    this.update(cx, |this, cx| {
 846                        this.reconnect(cx).ok();
 847                    })?;
 848                }
 849            }
 850
 851            Ok(())
 852        })
 853    }
 854
 855    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 856        self.state.as_ref().is_some_and(check)
 857    }
 858
 859    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 860        let new_state = self.state.as_ref().and_then(map);
 861        if let Some(new_state) = new_state {
 862            self.state.replace(new_state);
 863            cx.notify();
 864        }
 865    }
 866
 867    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 868        log::info!("setting state to '{}'", &state);
 869
 870        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 871        let is_server_not_running = state.is_server_not_running();
 872        self.state.replace(state);
 873
 874        if is_reconnect_exhausted || is_server_not_running {
 875            cx.emit(RemoteClientEvent::Disconnected);
 876        }
 877        cx.notify();
 878    }
 879
 880    pub fn shell(&self) -> Option<String> {
 881        Some(self.remote_connection()?.shell())
 882    }
 883
 884    pub fn default_system_shell(&self) -> Option<String> {
 885        Some(self.remote_connection()?.default_system_shell())
 886    }
 887
 888    pub fn shares_network_interface(&self) -> bool {
 889        self.remote_connection()
 890            .map_or(false, |connection| connection.shares_network_interface())
 891    }
 892
 893    pub fn build_command(
 894        &self,
 895        program: Option<String>,
 896        args: &[String],
 897        env: &HashMap<String, String>,
 898        working_dir: Option<String>,
 899        port_forward: Option<(u16, String, u16)>,
 900    ) -> Result<CommandTemplate> {
 901        let Some(connection) = self.remote_connection() else {
 902            return Err(anyhow!("no remote connection"));
 903        };
 904        connection.build_command(program, args, env, working_dir, port_forward)
 905    }
 906
 907    pub fn build_forward_ports_command(
 908        &self,
 909        forwards: Vec<(u16, String, u16)>,
 910    ) -> Result<CommandTemplate> {
 911        let Some(connection) = self.remote_connection() else {
 912            return Err(anyhow!("no remote connection"));
 913        };
 914        connection.build_forward_ports_command(forwards)
 915    }
 916
 917    pub fn upload_directory(
 918        &self,
 919        src_path: PathBuf,
 920        dest_path: RemotePathBuf,
 921        cx: &App,
 922    ) -> Task<Result<()>> {
 923        let Some(connection) = self.remote_connection() else {
 924            return Task::ready(Err(anyhow!("no remote connection")));
 925        };
 926        connection.upload_directory(src_path, dest_path, cx)
 927    }
 928
 929    pub fn proto_client(&self) -> AnyProtoClient {
 930        self.client.clone().into()
 931    }
 932
 933    pub fn connection_options(&self) -> RemoteConnectionOptions {
 934        self.connection_options.clone()
 935    }
 936
 937    pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 938        if let State::Connected {
 939            remote_connection, ..
 940        } = self.state.as_ref()?
 941        {
 942            Some(remote_connection.clone())
 943        } else {
 944            None
 945        }
 946    }
 947
 948    pub fn connection_state(&self) -> ConnectionState {
 949        self.state
 950            .as_ref()
 951            .map(ConnectionState::from)
 952            .unwrap_or(ConnectionState::Disconnected)
 953    }
 954
 955    pub fn is_disconnected(&self) -> bool {
 956        self.connection_state() == ConnectionState::Disconnected
 957    }
 958
 959    pub fn path_style(&self) -> PathStyle {
 960        self.path_style
 961    }
 962
 963    /// Forcibly disconnects from the remote server by killing the underlying connection.
 964    /// This will trigger the reconnection logic if reconnection attempts remain.
 965    /// Useful for testing reconnection behavior in real environments.
 966    pub fn force_disconnect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 967        let Some(connection) = self.remote_connection() else {
 968            return Task::ready(Err(anyhow!("no active remote connection to disconnect")));
 969        };
 970
 971        log::info!("force_disconnect: killing remote connection");
 972
 973        cx.spawn(async move |_, _| {
 974            connection.kill().await?;
 975            Ok(())
 976        })
 977    }
 978
 979    /// Simulates a timeout by pausing heartbeat responses.
 980    /// This will cause heartbeat failures and eventually trigger reconnection
 981    /// after MAX_MISSED_HEARTBEATS are missed.
 982    /// Useful for testing timeout behavior in real environments.
 983    pub fn force_heartbeat_timeout(&mut self, attempts: usize, cx: &mut Context<Self>) {
 984        log::info!("force_heartbeat_timeout: triggering heartbeat failure state");
 985
 986        if let Some(State::Connected {
 987            remote_connection,
 988            delegate,
 989            multiplex_task,
 990            heartbeat_task,
 991        }) = self.state.take()
 992        {
 993            self.set_state(
 994                if attempts == 0 {
 995                    State::HeartbeatMissed {
 996                        missed_heartbeats: MAX_MISSED_HEARTBEATS,
 997                        remote_connection,
 998                        delegate,
 999                        multiplex_task,
1000                        heartbeat_task,
1001                    }
1002                } else {
1003                    State::ReconnectFailed {
1004                        remote_connection,
1005                        delegate,
1006                        error: anyhow!("forced heartbeat timeout"),
1007                        attempts,
1008                    }
1009                },
1010                cx,
1011            );
1012
1013            self.reconnect(cx)
1014                .context("failed to start reconnect after forced timeout")
1015                .log_err();
1016        } else {
1017            log::warn!("force_heartbeat_timeout: not in Connected state, ignoring");
1018        }
1019    }
1020
1021    #[cfg(any(test, feature = "test-support"))]
1022    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
1023        let opts = self.connection_options();
1024        client_cx.spawn(async move |cx| {
1025            let connection = cx.update_global(|c: &mut ConnectionPool, _| {
1026                if let Some(ConnectionPoolEntry::Connected(c)) = c.connections.get(&opts) {
1027                    if let Some(connection) = c.upgrade() {
1028                        connection
1029                    } else {
1030                        panic!("connection was dropped")
1031                    }
1032                } else {
1033                    panic!("missing test connection")
1034                }
1035            });
1036
1037            connection.simulate_disconnect(cx);
1038        })
1039    }
1040
1041    /// Creates a mock connection pair for testing.
1042    ///
1043    /// This is the recommended way to create mock remote connections for tests.
1044    /// It returns the `MockConnectionOptions` (which can be passed to create a
1045    /// `HeadlessProject`), an `AnyProtoClient` for the server side and a
1046    /// `ConnectGuard` for the client side which blocks the connection from
1047    /// being established until dropped.
1048    ///
1049    /// # Example
1050    /// ```ignore
1051    /// let (opts, server_session, connect_guard) = RemoteClient::fake_server(cx, server_cx);
1052    /// // Set up HeadlessProject with server_session...
1053    /// drop(connect_guard);
1054    /// let client = RemoteClient::fake_client(opts, cx).await;
1055    /// ```
1056    #[cfg(any(test, feature = "test-support"))]
1057    pub fn fake_server(
1058        client_cx: &mut gpui::TestAppContext,
1059        server_cx: &mut gpui::TestAppContext,
1060    ) -> (RemoteConnectionOptions, AnyProtoClient, ConnectGuard) {
1061        use crate::transport::mock::MockConnection;
1062        let (opts, server_client, connect_guard) = MockConnection::new(client_cx, server_cx);
1063        (opts.into(), server_client, connect_guard)
1064    }
1065
1066    /// Creates a `RemoteClient` connected to a mock server.
1067    ///
1068    /// Call `fake_server` first to get the connection options, set up the
1069    /// `HeadlessProject` with the server session, then call this method
1070    /// to create the client.
1071    #[cfg(any(test, feature = "test-support"))]
1072    pub async fn connect_mock(
1073        opts: RemoteConnectionOptions,
1074        client_cx: &mut gpui::TestAppContext,
1075    ) -> Entity<Self> {
1076        assert!(matches!(opts, RemoteConnectionOptions::Mock(..)));
1077        use crate::transport::mock::MockDelegate;
1078        let (_tx, rx) = oneshot::channel();
1079        let mut cx = client_cx.to_async();
1080        let connection = connect(opts, Arc::new(MockDelegate), &mut cx)
1081            .await
1082            .unwrap();
1083        client_cx
1084            .update(|cx| {
1085                Self::new(
1086                    ConnectionIdentifier::setup(),
1087                    connection,
1088                    rx,
1089                    Arc::new(MockDelegate),
1090                    cx,
1091                )
1092            })
1093            .await
1094            .unwrap()
1095            .unwrap()
1096    }
1097
1098    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1099        self.state
1100            .as_ref()
1101            .and_then(|state| state.remote_connection())
1102    }
1103}
1104
1105enum ConnectionPoolEntry {
1106    Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1107    Connected(Weak<dyn RemoteConnection>),
1108}
1109
1110#[derive(Default)]
1111struct ConnectionPool {
1112    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1113}
1114
1115impl Global for ConnectionPool {}
1116
1117impl ConnectionPool {
1118    fn connect(
1119        &mut self,
1120        opts: RemoteConnectionOptions,
1121        delegate: Arc<dyn RemoteClientDelegate>,
1122        cx: &mut App,
1123    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1124        let connection = self.connections.get(&opts);
1125        match connection {
1126            Some(ConnectionPoolEntry::Connecting(task)) => {
1127                delegate.set_status(
1128                    Some("Waiting for existing connection attempt"),
1129                    &mut cx.to_async(),
1130                );
1131                return task.clone();
1132            }
1133            Some(ConnectionPoolEntry::Connected(remote)) => {
1134                if let Some(remote) = remote.upgrade()
1135                    && !remote.has_been_killed()
1136                {
1137                    return Task::ready(Ok(remote)).shared();
1138                }
1139                self.connections.remove(&opts);
1140            }
1141            None => {}
1142        }
1143
1144        let task = cx
1145            .spawn({
1146                let opts = opts.clone();
1147                let delegate = delegate.clone();
1148                async move |cx| {
1149                    let connection = match opts.clone() {
1150                        RemoteConnectionOptions::Ssh(opts) => {
1151                            SshRemoteConnection::new(opts, delegate, cx)
1152                                .await
1153                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1154                        }
1155                        RemoteConnectionOptions::Wsl(opts) => {
1156                            WslRemoteConnection::new(opts, delegate, cx)
1157                                .await
1158                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1159                        }
1160                        RemoteConnectionOptions::Docker(opts) => {
1161                            DockerExecConnection::new(opts, delegate, cx)
1162                                .await
1163                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1164                        }
1165                        #[cfg(any(test, feature = "test-support"))]
1166                        RemoteConnectionOptions::Mock(opts) => match cx.update(|cx| {
1167                            cx.default_global::<crate::transport::mock::MockConnectionRegistry>()
1168                                .take(&opts)
1169                        }) {
1170                            Some(connection) => Ok(connection.await as Arc<dyn RemoteConnection>),
1171                            None => Err(anyhow!(
1172                                "Mock connection not found. Call MockConnection::new() first."
1173                            )),
1174                        },
1175                    };
1176
1177                    cx.update_global(|pool: &mut Self, _| {
1178                        debug_assert!(matches!(
1179                            pool.connections.get(&opts),
1180                            Some(ConnectionPoolEntry::Connecting(_))
1181                        ));
1182                        match connection {
1183                            Ok(connection) => {
1184                                pool.connections.insert(
1185                                    opts.clone(),
1186                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1187                                );
1188                                Ok(connection)
1189                            }
1190                            Err(error) => {
1191                                pool.connections.remove(&opts);
1192                                Err(Arc::new(error))
1193                            }
1194                        }
1195                    })
1196                }
1197            })
1198            .shared();
1199
1200        self.connections
1201            .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1202        task
1203    }
1204}
1205
1206#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1207pub enum RemoteConnectionOptions {
1208    Ssh(SshConnectionOptions),
1209    Wsl(WslConnectionOptions),
1210    Docker(DockerConnectionOptions),
1211    #[cfg(any(test, feature = "test-support"))]
1212    Mock(crate::transport::mock::MockConnectionOptions),
1213}
1214
1215impl RemoteConnectionOptions {
1216    pub fn display_name(&self) -> String {
1217        match self {
1218            RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1219            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1220            RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1221            #[cfg(any(test, feature = "test-support"))]
1222            RemoteConnectionOptions::Mock(opts) => format!("mock-{}", opts.id),
1223        }
1224    }
1225}
1226
1227impl From<SshConnectionOptions> for RemoteConnectionOptions {
1228    fn from(opts: SshConnectionOptions) -> Self {
1229        RemoteConnectionOptions::Ssh(opts)
1230    }
1231}
1232
1233impl From<WslConnectionOptions> for RemoteConnectionOptions {
1234    fn from(opts: WslConnectionOptions) -> Self {
1235        RemoteConnectionOptions::Wsl(opts)
1236    }
1237}
1238
1239#[cfg(any(test, feature = "test-support"))]
1240impl From<crate::transport::mock::MockConnectionOptions> for RemoteConnectionOptions {
1241    fn from(opts: crate::transport::mock::MockConnectionOptions) -> Self {
1242        RemoteConnectionOptions::Mock(opts)
1243    }
1244}
1245
1246#[cfg(target_os = "windows")]
1247/// Open a wsl path (\\wsl.localhost\<distro>\path)
1248#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1249#[action(namespace = workspace, no_json, no_register)]
1250pub struct OpenWslPath {
1251    pub distro: WslConnectionOptions,
1252    pub paths: Vec<PathBuf>,
1253}
1254
1255#[async_trait(?Send)]
1256pub trait RemoteConnection: Send + Sync {
1257    fn start_proxy(
1258        &self,
1259        unique_identifier: String,
1260        reconnect: bool,
1261        incoming_tx: UnboundedSender<Envelope>,
1262        outgoing_rx: UnboundedReceiver<Envelope>,
1263        connection_activity_tx: Sender<()>,
1264        delegate: Arc<dyn RemoteClientDelegate>,
1265        cx: &mut AsyncApp,
1266    ) -> Task<Result<i32>>;
1267    fn upload_directory(
1268        &self,
1269        src_path: PathBuf,
1270        dest_path: RemotePathBuf,
1271        cx: &App,
1272    ) -> Task<Result<()>>;
1273    async fn kill(&self) -> Result<()>;
1274    fn has_been_killed(&self) -> bool;
1275    fn shares_network_interface(&self) -> bool {
1276        false
1277    }
1278    fn build_command(
1279        &self,
1280        program: Option<String>,
1281        args: &[String],
1282        env: &HashMap<String, String>,
1283        working_dir: Option<String>,
1284        port_forward: Option<(u16, String, u16)>,
1285    ) -> Result<CommandTemplate>;
1286    fn build_forward_ports_command(
1287        &self,
1288        forwards: Vec<(u16, String, u16)>,
1289    ) -> Result<CommandTemplate>;
1290    fn connection_options(&self) -> RemoteConnectionOptions;
1291    fn path_style(&self) -> PathStyle;
1292    fn shell(&self) -> String;
1293    fn default_system_shell(&self) -> String;
1294    fn has_wsl_interop(&self) -> bool;
1295
1296    #[cfg(any(test, feature = "test-support"))]
1297    fn simulate_disconnect(&self, _: &AsyncApp) {}
1298}
1299
1300type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1301
1302struct Signal<T> {
1303    tx: Mutex<Option<oneshot::Sender<T>>>,
1304    rx: Shared<Task<Option<T>>>,
1305}
1306
1307impl<T: Send + Clone + 'static> Signal<T> {
1308    pub fn new(cx: &App) -> Self {
1309        let (tx, rx) = oneshot::channel();
1310
1311        let task = cx
1312            .background_executor()
1313            .spawn(async move { rx.await.ok() })
1314            .shared();
1315
1316        Self {
1317            tx: Mutex::new(Some(tx)),
1318            rx: task,
1319        }
1320    }
1321
1322    fn set(&self, value: T) {
1323        if let Some(tx) = self.tx.lock().take() {
1324            let _ = tx.send(value);
1325        }
1326    }
1327
1328    fn wait(&self) -> Shared<Task<Option<T>>> {
1329        self.rx.clone()
1330    }
1331}
1332
1333pub(crate) struct ChannelClient {
1334    next_message_id: AtomicU32,
1335    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1336    buffer: Mutex<VecDeque<Envelope>>,
1337    response_channels: ResponseChannels,
1338    message_handlers: Mutex<ProtoMessageHandlerSet>,
1339    max_received: AtomicU32,
1340    name: &'static str,
1341    task: Mutex<Task<Result<()>>>,
1342    remote_started: Signal<()>,
1343    has_wsl_interop: bool,
1344}
1345
1346impl ChannelClient {
1347    pub(crate) fn new(
1348        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1349        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1350        cx: &App,
1351        name: &'static str,
1352        has_wsl_interop: bool,
1353    ) -> Arc<Self> {
1354        Arc::new_cyclic(|this| Self {
1355            outgoing_tx: Mutex::new(outgoing_tx),
1356            next_message_id: AtomicU32::new(0),
1357            max_received: AtomicU32::new(0),
1358            response_channels: ResponseChannels::default(),
1359            message_handlers: Default::default(),
1360            buffer: Mutex::new(VecDeque::new()),
1361            name,
1362            task: Mutex::new(Self::start_handling_messages(
1363                this.clone(),
1364                incoming_rx,
1365                &cx.to_async(),
1366            )),
1367            remote_started: Signal::new(cx),
1368            has_wsl_interop,
1369        })
1370    }
1371
1372    fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1373        self.remote_started.wait()
1374    }
1375
1376    fn start_handling_messages(
1377        this: Weak<Self>,
1378        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1379        cx: &AsyncApp,
1380    ) -> Task<Result<()>> {
1381        cx.spawn(async move |cx| {
1382            if let Some(this) = this.upgrade() {
1383                let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1384                this.outgoing_tx.lock().unbounded_send(envelope).ok();
1385            };
1386
1387            let peer_id = PeerId { owner_id: 0, id: 0 };
1388            while let Some(incoming) = incoming_rx.next().await {
1389                let Some(this) = this.upgrade() else {
1390                    return anyhow::Ok(());
1391                };
1392                if let Some(ack_id) = incoming.ack_id {
1393                    let mut buffer = this.buffer.lock();
1394                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1395                        buffer.pop_front();
1396                    }
1397                }
1398                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1399                {
1400                    log::debug!(
1401                        "{}:remote message received. name:FlushBufferedMessages",
1402                        this.name
1403                    );
1404                    {
1405                        let buffer = this.buffer.lock();
1406                        for envelope in buffer.iter() {
1407                            this.outgoing_tx
1408                                .lock()
1409                                .unbounded_send(envelope.clone())
1410                                .ok();
1411                        }
1412                    }
1413                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1414                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1415                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1416                    continue;
1417                }
1418
1419                if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1420                    this.remote_started.set(());
1421                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1422                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1423                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1424                    continue;
1425                }
1426
1427                this.max_received.store(incoming.id, SeqCst);
1428
1429                if let Some(request_id) = incoming.responding_to {
1430                    let request_id = MessageId(request_id);
1431                    let sender = this.response_channels.lock().remove(&request_id);
1432                    if let Some(sender) = sender {
1433                        let (tx, rx) = oneshot::channel();
1434                        if incoming.payload.is_some() {
1435                            sender.send((incoming, tx)).ok();
1436                        }
1437                        rx.await.ok();
1438                    }
1439                } else if let Some(envelope) =
1440                    build_typed_envelope(peer_id, Instant::now(), incoming)
1441                {
1442                    let type_name = envelope.payload_type_name();
1443                    let message_id = envelope.message_id();
1444                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1445                        &this.message_handlers,
1446                        envelope,
1447                        this.clone().into(),
1448                        cx.clone(),
1449                    ) {
1450                        log::debug!("{}:remote message received. name:{type_name}", this.name);
1451                        cx.foreground_executor()
1452                            .spawn(async move {
1453                                match future.await {
1454                                    Ok(_) => {
1455                                        log::debug!(
1456                                            "{}:remote message handled. name:{type_name}",
1457                                            this.name
1458                                        );
1459                                    }
1460                                    Err(error) => {
1461                                        log::error!(
1462                                            "{}:error handling message. type:{}, error:{:#}",
1463                                            this.name,
1464                                            type_name,
1465                                            format!("{error:#}").lines().fold(
1466                                                String::new(),
1467                                                |mut message, line| {
1468                                                    if !message.is_empty() {
1469                                                        message.push(' ');
1470                                                    }
1471                                                    message.push_str(line);
1472                                                    message
1473                                                }
1474                                            )
1475                                        );
1476                                    }
1477                                }
1478                            })
1479                            .detach()
1480                    } else {
1481                        log::error!("{}:unhandled remote message name:{type_name}", this.name);
1482                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1483                            message_id,
1484                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1485                        ) {
1486                            log::error!(
1487                                "{}:error sending error response for {type_name}:{e:#}",
1488                                this.name
1489                            );
1490                        }
1491                    }
1492                }
1493            }
1494            anyhow::Ok(())
1495        })
1496    }
1497
1498    pub(crate) fn reconnect(
1499        self: &Arc<Self>,
1500        incoming_rx: UnboundedReceiver<Envelope>,
1501        outgoing_tx: UnboundedSender<Envelope>,
1502        cx: &AsyncApp,
1503    ) {
1504        *self.outgoing_tx.lock() = outgoing_tx;
1505        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1506    }
1507
1508    fn request<T: RequestMessage>(
1509        &self,
1510        payload: T,
1511    ) -> impl 'static + Future<Output = Result<T::Response>> {
1512        self.request_internal(payload, true)
1513    }
1514
1515    fn request_internal<T: RequestMessage>(
1516        &self,
1517        payload: T,
1518        use_buffer: bool,
1519    ) -> impl 'static + Future<Output = Result<T::Response>> {
1520        log::debug!("remote request start. name:{}", T::NAME);
1521        let response =
1522            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1523        async move {
1524            let response = response.await?;
1525            log::debug!("remote request finish. name:{}", T::NAME);
1526            T::Response::from_envelope(response).context("received a response of the wrong type")
1527        }
1528    }
1529
1530    async fn resync(&self, timeout: Duration) -> Result<()> {
1531        smol::future::or(
1532            async {
1533                self.request_internal(proto::FlushBufferedMessages {}, false)
1534                    .await?;
1535
1536                for envelope in self.buffer.lock().iter() {
1537                    self.outgoing_tx
1538                        .lock()
1539                        .unbounded_send(envelope.clone())
1540                        .ok();
1541                }
1542                Ok(())
1543            },
1544            async {
1545                smol::Timer::after(timeout).await;
1546                anyhow::bail!("Timed out resyncing remote client")
1547            },
1548        )
1549        .await
1550    }
1551
1552    async fn ping(&self, timeout: Duration) -> Result<()> {
1553        smol::future::or(
1554            async {
1555                self.request(proto::Ping {}).await?;
1556                Ok(())
1557            },
1558            async {
1559                smol::Timer::after(timeout).await;
1560                anyhow::bail!("Timed out pinging remote client")
1561            },
1562        )
1563        .await
1564    }
1565
1566    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1567        log::debug!("remote send name:{}", T::NAME);
1568        self.send_dynamic(payload.into_envelope(0, None, None))
1569    }
1570
1571    fn request_dynamic(
1572        &self,
1573        mut envelope: proto::Envelope,
1574        type_name: &'static str,
1575        use_buffer: bool,
1576    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1577        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1578        let (tx, rx) = oneshot::channel();
1579        let mut response_channels_lock = self.response_channels.lock();
1580        response_channels_lock.insert(MessageId(envelope.id), tx);
1581        drop(response_channels_lock);
1582
1583        let result = if use_buffer {
1584            self.send_buffered(envelope)
1585        } else {
1586            self.send_unbuffered(envelope)
1587        };
1588        async move {
1589            if let Err(error) = &result {
1590                log::error!("failed to send message: {error}");
1591                anyhow::bail!("failed to send message: {error}");
1592            }
1593
1594            let response = rx.await.context("connection lost")?.0;
1595            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1596                return Err(RpcError::from_proto(error, type_name));
1597            }
1598            Ok(response)
1599        }
1600    }
1601
1602    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1603        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1604        self.send_buffered(envelope)
1605    }
1606
1607    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1608        envelope.ack_id = Some(self.max_received.load(SeqCst));
1609        self.buffer.lock().push_back(envelope.clone());
1610        // ignore errors on send (happen while we're reconnecting)
1611        // assume that the global "disconnected" overlay is sufficient.
1612        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1613        Ok(())
1614    }
1615
1616    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1617        envelope.ack_id = Some(self.max_received.load(SeqCst));
1618        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1619        Ok(())
1620    }
1621}
1622
1623impl ProtoClient for ChannelClient {
1624    fn request(
1625        &self,
1626        envelope: proto::Envelope,
1627        request_type: &'static str,
1628    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1629        self.request_dynamic(envelope, request_type, true).boxed()
1630    }
1631
1632    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1633        self.send_dynamic(envelope)
1634    }
1635
1636    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1637        self.send_dynamic(envelope)
1638    }
1639
1640    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1641        &self.message_handlers
1642    }
1643
1644    fn is_via_collab(&self) -> bool {
1645        false
1646    }
1647
1648    fn has_wsl_interop(&self) -> bool {
1649        self.has_wsl_interop
1650    }
1651}