remote_client.rs

   1use crate::{
   2    SshConnectionOptions,
   3    protocol::MessageId,
   4    proxy::ProxyLaunchError,
   5    transport::{
   6        docker::{DockerConnectionOptions, DockerExecConnection},
   7        ssh::SshRemoteConnection,
   8        wsl::{WslConnectionOptions, WslRemoteConnection},
   9    },
  10};
  11use anyhow::{Context as _, Result, anyhow};
  12use askpass::EncryptedPassword;
  13use async_trait::async_trait;
  14use collections::HashMap;
  15use futures::{
  16    Future, FutureExt as _, StreamExt as _,
  17    channel::{
  18        mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
  19        oneshot,
  20    },
  21    future::{BoxFuture, Shared},
  22    select, select_biased,
  23};
  24use gpui::{
  25    App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
  26    EventEmitter, FutureExt, Global, Task, WeakEntity,
  27};
  28use parking_lot::Mutex;
  29
  30use release_channel::ReleaseChannel;
  31use rpc::{
  32    AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
  33    proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
  34};
  35use semver::Version;
  36use std::{
  37    collections::VecDeque,
  38    fmt,
  39    ops::ControlFlow,
  40    path::PathBuf,
  41    sync::{
  42        Arc, Weak,
  43        atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
  44    },
  45    time::{Duration, Instant},
  46};
  47use util::{
  48    ResultExt,
  49    paths::{PathStyle, RemotePathBuf},
  50};
  51
  52#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  53pub enum RemoteOs {
  54    Linux,
  55    MacOs,
  56    Windows,
  57}
  58
  59impl RemoteOs {
  60    pub fn as_str(&self) -> &'static str {
  61        match self {
  62            RemoteOs::Linux => "linux",
  63            RemoteOs::MacOs => "macos",
  64            RemoteOs::Windows => "windows",
  65        }
  66    }
  67
  68    pub fn is_windows(&self) -> bool {
  69        matches!(self, RemoteOs::Windows)
  70    }
  71}
  72
  73impl std::fmt::Display for RemoteOs {
  74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  75        f.write_str(self.as_str())
  76    }
  77}
  78
  79#[derive(Copy, Clone, Debug, PartialEq, Eq)]
  80pub enum RemoteArch {
  81    X86_64,
  82    Aarch64,
  83}
  84
  85impl RemoteArch {
  86    pub fn as_str(&self) -> &'static str {
  87        match self {
  88            RemoteArch::X86_64 => "x86_64",
  89            RemoteArch::Aarch64 => "aarch64",
  90        }
  91    }
  92}
  93
  94impl std::fmt::Display for RemoteArch {
  95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  96        f.write_str(self.as_str())
  97    }
  98}
  99
 100#[derive(Copy, Clone, Debug)]
 101pub struct RemotePlatform {
 102    pub os: RemoteOs,
 103    pub arch: RemoteArch,
 104}
 105
 106#[derive(Clone, Debug)]
 107pub struct CommandTemplate {
 108    pub program: String,
 109    pub args: Vec<String>,
 110    pub env: HashMap<String, String>,
 111}
 112
 113pub trait RemoteClientDelegate: Send + Sync {
 114    fn ask_password(
 115        &self,
 116        prompt: String,
 117        tx: oneshot::Sender<EncryptedPassword>,
 118        cx: &mut AsyncApp,
 119    );
 120    fn get_download_url(
 121        &self,
 122        platform: RemotePlatform,
 123        release_channel: ReleaseChannel,
 124        version: Option<Version>,
 125        cx: &mut AsyncApp,
 126    ) -> Task<Result<Option<String>>>;
 127    fn download_server_binary_locally(
 128        &self,
 129        platform: RemotePlatform,
 130        release_channel: ReleaseChannel,
 131        version: Option<Version>,
 132        cx: &mut AsyncApp,
 133    ) -> Task<Result<PathBuf>>;
 134    fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
 135}
 136
 137const MAX_MISSED_HEARTBEATS: usize = 5;
 138const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
 139const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
 140const INITIAL_CONNECTION_TIMEOUT: Duration =
 141    Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
 142
 143const MAX_RECONNECT_ATTEMPTS: usize = 3;
 144
 145enum State {
 146    Connecting,
 147    Connected {
 148        remote_connection: Arc<dyn RemoteConnection>,
 149        delegate: Arc<dyn RemoteClientDelegate>,
 150
 151        multiplex_task: Task<Result<()>>,
 152        heartbeat_task: Task<Result<()>>,
 153    },
 154    HeartbeatMissed {
 155        missed_heartbeats: usize,
 156
 157        remote_connection: Arc<dyn RemoteConnection>,
 158        delegate: Arc<dyn RemoteClientDelegate>,
 159
 160        multiplex_task: Task<Result<()>>,
 161        heartbeat_task: Task<Result<()>>,
 162    },
 163    Reconnecting,
 164    ReconnectFailed {
 165        remote_connection: Arc<dyn RemoteConnection>,
 166        delegate: Arc<dyn RemoteClientDelegate>,
 167
 168        error: anyhow::Error,
 169        attempts: usize,
 170    },
 171    ReconnectExhausted,
 172    ServerNotRunning,
 173}
 174
 175impl fmt::Display for State {
 176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
 177        match self {
 178            Self::Connecting => write!(f, "connecting"),
 179            Self::Connected { .. } => write!(f, "connected"),
 180            Self::Reconnecting => write!(f, "reconnecting"),
 181            Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
 182            Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
 183            Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
 184            Self::ServerNotRunning { .. } => write!(f, "server not running"),
 185        }
 186    }
 187}
 188
 189impl State {
 190    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 191        match self {
 192            Self::Connected {
 193                remote_connection, ..
 194            } => Some(remote_connection.clone()),
 195            Self::HeartbeatMissed {
 196                remote_connection, ..
 197            } => Some(remote_connection.clone()),
 198            Self::ReconnectFailed {
 199                remote_connection, ..
 200            } => Some(remote_connection.clone()),
 201            _ => None,
 202        }
 203    }
 204
 205    fn can_reconnect(&self) -> bool {
 206        match self {
 207            Self::Connected { .. }
 208            | Self::HeartbeatMissed { .. }
 209            | Self::ReconnectFailed { .. } => true,
 210            State::Connecting
 211            | State::Reconnecting
 212            | State::ReconnectExhausted
 213            | State::ServerNotRunning => false,
 214        }
 215    }
 216
 217    fn is_reconnect_failed(&self) -> bool {
 218        matches!(self, Self::ReconnectFailed { .. })
 219    }
 220
 221    fn is_reconnect_exhausted(&self) -> bool {
 222        matches!(self, Self::ReconnectExhausted { .. })
 223    }
 224
 225    fn is_server_not_running(&self) -> bool {
 226        matches!(self, Self::ServerNotRunning)
 227    }
 228
 229    fn is_reconnecting(&self) -> bool {
 230        matches!(self, Self::Reconnecting { .. })
 231    }
 232
 233    fn heartbeat_recovered(self) -> Self {
 234        match self {
 235            Self::HeartbeatMissed {
 236                remote_connection,
 237                delegate,
 238                multiplex_task,
 239                heartbeat_task,
 240                ..
 241            } => Self::Connected {
 242                remote_connection: remote_connection,
 243                delegate,
 244                multiplex_task,
 245                heartbeat_task,
 246            },
 247            _ => self,
 248        }
 249    }
 250
 251    fn heartbeat_missed(self) -> Self {
 252        match self {
 253            Self::Connected {
 254                remote_connection,
 255                delegate,
 256                multiplex_task,
 257                heartbeat_task,
 258            } => Self::HeartbeatMissed {
 259                missed_heartbeats: 1,
 260                remote_connection,
 261                delegate,
 262                multiplex_task,
 263                heartbeat_task,
 264            },
 265            Self::HeartbeatMissed {
 266                missed_heartbeats,
 267                remote_connection,
 268                delegate,
 269                multiplex_task,
 270                heartbeat_task,
 271            } => Self::HeartbeatMissed {
 272                missed_heartbeats: missed_heartbeats + 1,
 273                remote_connection,
 274                delegate,
 275                multiplex_task,
 276                heartbeat_task,
 277            },
 278            _ => self,
 279        }
 280    }
 281}
 282
 283/// The state of the ssh connection.
 284#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 285pub enum ConnectionState {
 286    Connecting,
 287    Connected,
 288    HeartbeatMissed,
 289    Reconnecting,
 290    Disconnected,
 291}
 292
 293impl From<&State> for ConnectionState {
 294    fn from(value: &State) -> Self {
 295        match value {
 296            State::Connecting => Self::Connecting,
 297            State::Connected { .. } => Self::Connected,
 298            State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
 299            State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
 300            State::ReconnectExhausted => Self::Disconnected,
 301            State::ServerNotRunning => Self::Disconnected,
 302        }
 303    }
 304}
 305
 306pub struct RemoteClient {
 307    client: Arc<ChannelClient>,
 308    unique_identifier: String,
 309    connection_options: RemoteConnectionOptions,
 310    path_style: PathStyle,
 311    state: Option<State>,
 312}
 313
 314#[derive(Debug)]
 315pub enum RemoteClientEvent {
 316    Disconnected,
 317}
 318
 319impl EventEmitter<RemoteClientEvent> for RemoteClient {}
 320
 321/// Identifies the socket on the remote server so that reconnects
 322/// can re-join the same project.
 323pub enum ConnectionIdentifier {
 324    Setup(u64),
 325    Workspace(i64),
 326}
 327
 328static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 329
 330impl ConnectionIdentifier {
 331    pub fn setup() -> Self {
 332        Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
 333    }
 334
 335    // This string gets used in a socket name, and so must be relatively short.
 336    // The total length of:
 337    //   /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
 338    // Must be less than about 100 characters
 339    //   https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
 340    // So our strings should be at most 20 characters or so.
 341    fn to_string(&self, cx: &App) -> String {
 342        let identifier_prefix = match ReleaseChannel::global(cx) {
 343            ReleaseChannel::Stable => "".to_string(),
 344            release_channel => format!("{}-", release_channel.dev_name()),
 345        };
 346        match self {
 347            Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
 348            Self::Workspace(workspace_id) => {
 349                format!("{identifier_prefix}workspace-{workspace_id}",)
 350            }
 351        }
 352    }
 353}
 354
 355pub async fn connect(
 356    connection_options: RemoteConnectionOptions,
 357    delegate: Arc<dyn RemoteClientDelegate>,
 358    cx: &mut AsyncApp,
 359) -> Result<Arc<dyn RemoteConnection>> {
 360    cx.update(|cx| {
 361        cx.update_default_global(|pool: &mut ConnectionPool, cx| {
 362            pool.connect(connection_options.clone(), delegate.clone(), cx)
 363        })
 364    })
 365    .await
 366    .map_err(|e| e.cloned())
 367}
 368
 369impl RemoteClient {
 370    pub fn new(
 371        unique_identifier: ConnectionIdentifier,
 372        remote_connection: Arc<dyn RemoteConnection>,
 373        cancellation: oneshot::Receiver<()>,
 374        delegate: Arc<dyn RemoteClientDelegate>,
 375        cx: &mut App,
 376    ) -> Task<Result<Option<Entity<Self>>>> {
 377        let unique_identifier = unique_identifier.to_string(cx);
 378        cx.spawn(async move |cx| {
 379            let success = Box::pin(async move {
 380                let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 381                let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 382                let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 383
 384                let client = cx.update(|cx| {
 385                    ChannelClient::new(
 386                        incoming_rx,
 387                        outgoing_tx,
 388                        cx,
 389                        "client",
 390                        remote_connection.has_wsl_interop(),
 391                    )
 392                });
 393
 394                let path_style = remote_connection.path_style();
 395                let this = cx.new(|_| Self {
 396                    client: client.clone(),
 397                    unique_identifier: unique_identifier.clone(),
 398                    connection_options: remote_connection.connection_options(),
 399                    path_style,
 400                    state: Some(State::Connecting),
 401                });
 402
 403                let io_task = remote_connection.start_proxy(
 404                    unique_identifier,
 405                    false,
 406                    incoming_tx,
 407                    outgoing_rx,
 408                    connection_activity_tx,
 409                    delegate.clone(),
 410                    cx,
 411                );
 412
 413                let ready = client
 414                    .wait_for_remote_started()
 415                    .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
 416                    .await;
 417                match ready {
 418                    Ok(Some(_)) => {}
 419                    Ok(None) => {
 420                        let mut error = "remote client exited before becoming ready".to_owned();
 421                        if let Some(status) = io_task.now_or_never() {
 422                            match status {
 423                                Ok(exit_code) => {
 424                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 425                                }
 426                                Err(e) => error.push_str(&format!(", error={e:?}")),
 427                            }
 428                        }
 429                        let error = anyhow::anyhow!("{error}");
 430                        log::error!("failed to establish connection: {}", error);
 431                        return Err(error);
 432                    }
 433                    Err(_) => {
 434                        let mut error =
 435                            "remote client did not become ready within the timeout".to_owned();
 436                        if let Some(status) = io_task.now_or_never() {
 437                            match status {
 438                                Ok(exit_code) => {
 439                                    error.push_str(&format!(", exit_code={exit_code:?}"))
 440                                }
 441                                Err(e) => error.push_str(&format!(", error={e:?}")),
 442                            }
 443                        }
 444                        let error = anyhow::anyhow!("{error}");
 445                        log::error!("failed to establish connection: {}", error);
 446                        return Err(error);
 447                    }
 448                }
 449                let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
 450                if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
 451                    log::error!("failed to establish connection: {}", error);
 452                    return Err(error);
 453                }
 454
 455                let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
 456
 457                this.update(cx, |this, _| {
 458                    this.state = Some(State::Connected {
 459                        remote_connection,
 460                        delegate,
 461                        multiplex_task,
 462                        heartbeat_task,
 463                    });
 464                });
 465
 466                Ok(Some(this))
 467            });
 468
 469            select! {
 470                _ = cancellation.fuse() => {
 471                    Ok(None)
 472                }
 473                result = success.fuse() =>  result
 474            }
 475        })
 476    }
 477
 478    pub fn proto_client_from_channels(
 479        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
 480        outgoing_tx: mpsc::UnboundedSender<Envelope>,
 481        cx: &App,
 482        name: &'static str,
 483        has_wsl_interop: bool,
 484    ) -> AnyProtoClient {
 485        ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
 486    }
 487
 488    pub fn shutdown_processes<T: RequestMessage>(
 489        &mut self,
 490        shutdown_request: Option<T>,
 491        executor: BackgroundExecutor,
 492    ) -> Option<impl Future<Output = ()> + use<T>> {
 493        let state = self.state.take()?;
 494        log::info!("shutting down ssh processes");
 495
 496        let State::Connected {
 497            multiplex_task,
 498            heartbeat_task,
 499            remote_connection,
 500            delegate,
 501        } = state
 502        else {
 503            return None;
 504        };
 505
 506        let client = self.client.clone();
 507
 508        Some(async move {
 509            if let Some(shutdown_request) = shutdown_request {
 510                client.send(shutdown_request).log_err();
 511                // We wait 50ms instead of waiting for a response, because
 512                // waiting for a response would require us to wait on the main thread
 513                // which we want to avoid in an `on_app_quit` callback.
 514                executor.timer(Duration::from_millis(50)).await;
 515            }
 516
 517            // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
 518            // child of master_process.
 519            drop(multiplex_task);
 520            // Now drop the rest of state, which kills master process.
 521            drop(heartbeat_task);
 522            drop(remote_connection);
 523            drop(delegate);
 524        })
 525    }
 526
 527    fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
 528        let can_reconnect = self
 529            .state
 530            .as_ref()
 531            .map(|state| state.can_reconnect())
 532            .unwrap_or(false);
 533        if !can_reconnect {
 534            log::info!("aborting reconnect, because not in state that allows reconnecting");
 535            let error = if let Some(state) = self.state.as_ref() {
 536                format!("invalid state, cannot reconnect while in state {state}")
 537            } else {
 538                "no state set".to_string()
 539            };
 540            anyhow::bail!(error);
 541        }
 542
 543        let state = self.state.take().unwrap();
 544        let (attempts, remote_connection, delegate) = match state {
 545            State::Connected {
 546                remote_connection,
 547                delegate,
 548                multiplex_task,
 549                heartbeat_task,
 550            }
 551            | State::HeartbeatMissed {
 552                remote_connection,
 553                delegate,
 554                multiplex_task,
 555                heartbeat_task,
 556                ..
 557            } => {
 558                drop(multiplex_task);
 559                drop(heartbeat_task);
 560                (0, remote_connection, delegate)
 561            }
 562            State::ReconnectFailed {
 563                attempts,
 564                remote_connection,
 565                delegate,
 566                ..
 567            } => (attempts, remote_connection, delegate),
 568            State::Connecting
 569            | State::Reconnecting
 570            | State::ReconnectExhausted
 571            | State::ServerNotRunning => unreachable!(),
 572        };
 573
 574        let attempts = attempts + 1;
 575        if attempts > MAX_RECONNECT_ATTEMPTS {
 576            log::error!(
 577                "Failed to reconnect to after {} attempts, giving up",
 578                MAX_RECONNECT_ATTEMPTS
 579            );
 580            self.set_state(State::ReconnectExhausted, cx);
 581            return Ok(());
 582        }
 583
 584        self.set_state(State::Reconnecting, cx);
 585
 586        log::info!(
 587            "Trying to reconnect to remote server... Attempt {}",
 588            attempts
 589        );
 590
 591        let unique_identifier = self.unique_identifier.clone();
 592        let client = self.client.clone();
 593        let reconnect_task = cx.spawn(async move |this, cx| {
 594            macro_rules! failed {
 595                ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
 596                    delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
 597                    return State::ReconnectFailed {
 598                        error: anyhow!($error),
 599                        attempts: $attempts,
 600                        remote_connection: $remote_connection,
 601                        delegate: $delegate,
 602                    };
 603                };
 604            }
 605
 606            if let Err(error) = remote_connection
 607                .kill()
 608                .await
 609                .context("Failed to kill remote_connection process")
 610            {
 611                failed!(error, attempts, remote_connection, delegate);
 612            };
 613
 614            let connection_options = remote_connection.connection_options();
 615
 616            let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
 617            let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 618            let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
 619
 620            let (remote_connection, io_task) = match async {
 621                let remote_connection = cx
 622                    .update_global(|pool: &mut ConnectionPool, cx| {
 623                        pool.connect(connection_options, delegate.clone(), cx)
 624                    })
 625                    .await
 626                    .map_err(|error| error.cloned())?;
 627
 628                let io_task = remote_connection.start_proxy(
 629                    unique_identifier,
 630                    true,
 631                    incoming_tx,
 632                    outgoing_rx,
 633                    connection_activity_tx,
 634                    delegate.clone(),
 635                    cx,
 636                );
 637                anyhow::Ok((remote_connection, io_task))
 638            }
 639            .await
 640            {
 641                Ok((remote_connection, io_task)) => (remote_connection, io_task),
 642                Err(error) => {
 643                    failed!(error, attempts, remote_connection, delegate);
 644                }
 645            };
 646
 647            let multiplex_task = Self::monitor(this.clone(), io_task, cx);
 648            client.reconnect(incoming_rx, outgoing_tx, cx);
 649
 650            if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
 651                failed!(error, attempts, remote_connection, delegate);
 652            };
 653
 654            State::Connected {
 655                remote_connection: remote_connection,
 656                delegate,
 657                multiplex_task,
 658                heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
 659            }
 660        });
 661
 662        cx.spawn(async move |this, cx| {
 663            let new_state = reconnect_task.await;
 664            this.update(cx, |this, cx| {
 665                this.try_set_state(cx, |old_state| {
 666                    if old_state.is_reconnecting() {
 667                        match &new_state {
 668                            State::Connecting
 669                            | State::Reconnecting
 670                            | State::HeartbeatMissed { .. }
 671                            | State::ServerNotRunning => {}
 672                            State::Connected { .. } => {
 673                                log::info!("Successfully reconnected");
 674                            }
 675                            State::ReconnectFailed {
 676                                error, attempts, ..
 677                            } => {
 678                                log::error!(
 679                                    "Reconnect attempt {} failed: {:?}. Starting new attempt...",
 680                                    attempts,
 681                                    error
 682                                );
 683                            }
 684                            State::ReconnectExhausted => {
 685                                log::error!("Reconnect attempt failed and all attempts exhausted");
 686                            }
 687                        }
 688                        Some(new_state)
 689                    } else {
 690                        None
 691                    }
 692                });
 693
 694                if this.state_is(State::is_reconnect_failed) {
 695                    this.reconnect(cx)
 696                } else if this.state_is(State::is_reconnect_exhausted) {
 697                    Ok(())
 698                } else {
 699                    log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
 700                    Ok(())
 701                }
 702            })
 703        })
 704        .detach_and_log_err(cx);
 705
 706        Ok(())
 707    }
 708
 709    fn heartbeat(
 710        this: WeakEntity<Self>,
 711        mut connection_activity_rx: mpsc::Receiver<()>,
 712        cx: &mut AsyncApp,
 713    ) -> Task<Result<()>> {
 714        let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
 715            return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
 716        };
 717
 718        cx.spawn(async move |cx| {
 719            let mut missed_heartbeats = 0;
 720
 721            let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
 722            futures::pin_mut!(keepalive_timer);
 723
 724            loop {
 725                select_biased! {
 726                    result = connection_activity_rx.next().fuse() => {
 727                        if result.is_none() {
 728                            log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
 729                            return Ok(());
 730                        }
 731
 732                        if missed_heartbeats != 0 {
 733                            missed_heartbeats = 0;
 734                            let _ =this.update(cx, |this, cx| {
 735                                this.handle_heartbeat_result(missed_heartbeats, cx)
 736                            })?;
 737                        }
 738                    }
 739                    _ = keepalive_timer => {
 740                        log::debug!("Sending heartbeat to server...");
 741
 742                        let result = select_biased! {
 743                            _ = connection_activity_rx.next().fuse() => {
 744                                Ok(())
 745                            }
 746                            ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
 747                                ping_result
 748                            }
 749                        };
 750
 751                        if result.is_err() {
 752                            missed_heartbeats += 1;
 753                            log::warn!(
 754                                "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
 755                                HEARTBEAT_TIMEOUT,
 756                                missed_heartbeats,
 757                                MAX_MISSED_HEARTBEATS
 758                            );
 759                        } else if missed_heartbeats != 0 {
 760                            missed_heartbeats = 0;
 761                        } else {
 762                            continue;
 763                        }
 764
 765                        let result = this.update(cx, |this, cx| {
 766                            this.handle_heartbeat_result(missed_heartbeats, cx)
 767                        })?;
 768                        if result.is_break() {
 769                            return Ok(());
 770                        }
 771                    }
 772                }
 773
 774                keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
 775            }
 776        })
 777    }
 778
 779    fn handle_heartbeat_result(
 780        &mut self,
 781        missed_heartbeats: usize,
 782        cx: &mut Context<Self>,
 783    ) -> ControlFlow<()> {
 784        let state = self.state.take().unwrap();
 785        let next_state = if missed_heartbeats > 0 {
 786            state.heartbeat_missed()
 787        } else {
 788            state.heartbeat_recovered()
 789        };
 790
 791        self.set_state(next_state, cx);
 792
 793        if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
 794            log::error!(
 795                "Missed last {} heartbeats. Reconnecting...",
 796                missed_heartbeats
 797            );
 798
 799            self.reconnect(cx)
 800                .context("failed to start reconnect process after missing heartbeats")
 801                .log_err();
 802            ControlFlow::Break(())
 803        } else {
 804            ControlFlow::Continue(())
 805        }
 806    }
 807
 808    fn monitor(
 809        this: WeakEntity<Self>,
 810        io_task: Task<Result<i32>>,
 811        cx: &AsyncApp,
 812    ) -> Task<Result<()>> {
 813        cx.spawn(async move |cx| {
 814            let result = io_task.await;
 815
 816            match result {
 817                Ok(exit_code) => {
 818                    if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
 819                        match error {
 820                            ProxyLaunchError::ServerNotRunning => {
 821                                log::error!("failed to reconnect because server is not running");
 822                                this.update(cx, |this, cx| {
 823                                    this.set_state(State::ServerNotRunning, cx);
 824                                })?;
 825                            }
 826                        }
 827                    } else if exit_code > 0 {
 828                        log::error!("proxy process terminated unexpectedly");
 829                        this.update(cx, |this, cx| {
 830                            this.reconnect(cx).ok();
 831                        })?;
 832                    }
 833                }
 834                Err(error) => {
 835                    log::warn!(
 836                        "remote io task died with error: {:?}. reconnecting...",
 837                        error
 838                    );
 839                    this.update(cx, |this, cx| {
 840                        this.reconnect(cx).ok();
 841                    })?;
 842                }
 843            }
 844
 845            Ok(())
 846        })
 847    }
 848
 849    fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
 850        self.state.as_ref().is_some_and(check)
 851    }
 852
 853    fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
 854        let new_state = self.state.as_ref().and_then(map);
 855        if let Some(new_state) = new_state {
 856            self.state.replace(new_state);
 857            cx.notify();
 858        }
 859    }
 860
 861    fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
 862        log::info!("setting state to '{}'", &state);
 863
 864        let is_reconnect_exhausted = state.is_reconnect_exhausted();
 865        let is_server_not_running = state.is_server_not_running();
 866        self.state.replace(state);
 867
 868        if is_reconnect_exhausted || is_server_not_running {
 869            cx.emit(RemoteClientEvent::Disconnected);
 870        }
 871        cx.notify();
 872    }
 873
 874    pub fn shell(&self) -> Option<String> {
 875        Some(self.remote_connection()?.shell())
 876    }
 877
 878    pub fn default_system_shell(&self) -> Option<String> {
 879        Some(self.remote_connection()?.default_system_shell())
 880    }
 881
 882    pub fn shares_network_interface(&self) -> bool {
 883        self.remote_connection()
 884            .map_or(false, |connection| connection.shares_network_interface())
 885    }
 886
 887    pub fn build_command(
 888        &self,
 889        program: Option<String>,
 890        args: &[String],
 891        env: &HashMap<String, String>,
 892        working_dir: Option<String>,
 893        port_forward: Option<(u16, String, u16)>,
 894    ) -> Result<CommandTemplate> {
 895        let Some(connection) = self.remote_connection() else {
 896            return Err(anyhow!("no remote connection"));
 897        };
 898        connection.build_command(program, args, env, working_dir, port_forward)
 899    }
 900
 901    pub fn build_forward_ports_command(
 902        &self,
 903        forwards: Vec<(u16, String, u16)>,
 904    ) -> Result<CommandTemplate> {
 905        let Some(connection) = self.remote_connection() else {
 906            return Err(anyhow!("no remote connection"));
 907        };
 908        connection.build_forward_ports_command(forwards)
 909    }
 910
 911    pub fn upload_directory(
 912        &self,
 913        src_path: PathBuf,
 914        dest_path: RemotePathBuf,
 915        cx: &App,
 916    ) -> Task<Result<()>> {
 917        let Some(connection) = self.remote_connection() else {
 918            return Task::ready(Err(anyhow!("no remote connection")));
 919        };
 920        connection.upload_directory(src_path, dest_path, cx)
 921    }
 922
 923    pub fn proto_client(&self) -> AnyProtoClient {
 924        self.client.clone().into()
 925    }
 926
 927    pub fn connection_options(&self) -> RemoteConnectionOptions {
 928        self.connection_options.clone()
 929    }
 930
 931    pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
 932        if let State::Connected {
 933            remote_connection, ..
 934        } = self.state.as_ref()?
 935        {
 936            Some(remote_connection.clone())
 937        } else {
 938            None
 939        }
 940    }
 941
 942    pub fn connection_state(&self) -> ConnectionState {
 943        self.state
 944            .as_ref()
 945            .map(ConnectionState::from)
 946            .unwrap_or(ConnectionState::Disconnected)
 947    }
 948
 949    pub fn is_disconnected(&self) -> bool {
 950        self.connection_state() == ConnectionState::Disconnected
 951    }
 952
 953    pub fn path_style(&self) -> PathStyle {
 954        self.path_style
 955    }
 956
 957    #[cfg(any(test, feature = "test-support"))]
 958    pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
 959        let opts = self.connection_options();
 960        client_cx.spawn(async move |cx| {
 961            let connection = cx.update_global(|c: &mut ConnectionPool, _| {
 962                if let Some(ConnectionPoolEntry::Connected(c)) = c.connections.get(&opts) {
 963                    if let Some(connection) = c.upgrade() {
 964                        connection
 965                    } else {
 966                        panic!("connection was dropped")
 967                    }
 968                } else {
 969                    panic!("missing test connection")
 970                }
 971            });
 972
 973            connection.simulate_disconnect(cx);
 974        })
 975    }
 976
 977    /// Creates a mock connection pair for testing.
 978    ///
 979    /// This is the recommended way to create mock remote connections for tests.
 980    /// It returns both the `MockConnectionOptions` (which can be passed to create
 981    /// a `HeadlessProject`) and an `AnyProtoClient` for the server side.
 982    ///
 983    /// # Example
 984    /// ```ignore
 985    /// let (opts, server_session) = RemoteClient::fake_server(cx, server_cx);
 986    /// // Set up HeadlessProject with server_session...
 987    /// let client = RemoteClient::fake_client(opts, cx).await;
 988    /// ```
 989    #[cfg(any(test, feature = "test-support"))]
 990    pub fn fake_server(
 991        client_cx: &mut gpui::TestAppContext,
 992        server_cx: &mut gpui::TestAppContext,
 993    ) -> (RemoteConnectionOptions, AnyProtoClient) {
 994        use crate::transport::mock::MockConnection;
 995        let (opts, server_client) = MockConnection::new(client_cx, server_cx);
 996        (opts.into(), server_client)
 997    }
 998
 999    /// Creates a `RemoteClient` connected to a mock server.
1000    ///
1001    /// Call `fake_server` first to get the connection options, set up the
1002    /// `HeadlessProject` with the server session, then call this method
1003    /// to create the client.
1004    #[cfg(any(test, feature = "test-support"))]
1005    pub async fn fake_client(
1006        opts: RemoteConnectionOptions,
1007        client_cx: &mut gpui::TestAppContext,
1008    ) -> Entity<Self> {
1009        use crate::transport::mock::MockDelegate;
1010        let (_tx, rx) = oneshot::channel();
1011        let mut cx = client_cx.to_async();
1012        let connection = connect(opts, Arc::new(MockDelegate), &mut cx)
1013            .await
1014            .unwrap();
1015        client_cx
1016            .update(|cx| {
1017                Self::new(
1018                    ConnectionIdentifier::setup(),
1019                    connection,
1020                    rx,
1021                    Arc::new(MockDelegate),
1022                    cx,
1023                )
1024            })
1025            .await
1026            .unwrap()
1027            .unwrap()
1028    }
1029
1030    fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1031        self.state
1032            .as_ref()
1033            .and_then(|state| state.remote_connection())
1034    }
1035}
1036
1037enum ConnectionPoolEntry {
1038    Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1039    Connected(Weak<dyn RemoteConnection>),
1040}
1041
1042#[derive(Default)]
1043struct ConnectionPool {
1044    connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1045}
1046
1047impl Global for ConnectionPool {}
1048
1049impl ConnectionPool {
1050    pub fn connect(
1051        &mut self,
1052        opts: RemoteConnectionOptions,
1053        delegate: Arc<dyn RemoteClientDelegate>,
1054        cx: &mut App,
1055    ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1056        let connection = self.connections.get(&opts);
1057        match connection {
1058            Some(ConnectionPoolEntry::Connecting(task)) => {
1059                delegate.set_status(
1060                    Some("Waiting for existing connection attempt"),
1061                    &mut cx.to_async(),
1062                );
1063                return task.clone();
1064            }
1065            Some(ConnectionPoolEntry::Connected(remote)) => {
1066                if let Some(remote) = remote.upgrade()
1067                    && !remote.has_been_killed()
1068                {
1069                    return Task::ready(Ok(remote)).shared();
1070                }
1071                self.connections.remove(&opts);
1072            }
1073            None => {}
1074        }
1075
1076        let task = cx
1077            .spawn({
1078                let opts = opts.clone();
1079                let delegate = delegate.clone();
1080                async move |cx| {
1081                    let connection = match opts.clone() {
1082                        RemoteConnectionOptions::Ssh(opts) => {
1083                            SshRemoteConnection::new(opts, delegate, cx)
1084                                .await
1085                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1086                        }
1087                        RemoteConnectionOptions::Wsl(opts) => {
1088                            WslRemoteConnection::new(opts, delegate, cx)
1089                                .await
1090                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1091                        }
1092                        RemoteConnectionOptions::Docker(opts) => {
1093                            DockerExecConnection::new(opts, delegate, cx)
1094                                .await
1095                                .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1096                        }
1097                        #[cfg(any(test, feature = "test-support"))]
1098                        RemoteConnectionOptions::Mock(opts) => {
1099                            cx.update(|cx| {
1100                                cx.default_global::<crate::transport::mock::MockConnectionRegistry>()
1101                                    .take(&opts)
1102                                    .ok_or_else(|| anyhow!(
1103                                        "Mock connection not found. Call MockConnection::new() first."
1104                                    ))
1105                                    .map(|connection| connection as Arc<dyn RemoteConnection>)
1106                            })
1107                        }
1108                    };
1109
1110                    cx.update_global(|pool: &mut Self, _| {
1111                        debug_assert!(matches!(
1112                            pool.connections.get(&opts),
1113                            Some(ConnectionPoolEntry::Connecting(_))
1114                        ));
1115                        match connection {
1116                            Ok(connection) => {
1117                                pool.connections.insert(
1118                                    opts.clone(),
1119                                    ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1120                                );
1121                                Ok(connection)
1122                            }
1123                            Err(error) => {
1124                                pool.connections.remove(&opts);
1125                                Err(Arc::new(error))
1126                            }
1127                        }
1128                    })
1129                }
1130            })
1131            .shared();
1132
1133        self.connections
1134            .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1135        task
1136    }
1137}
1138
1139#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1140pub enum RemoteConnectionOptions {
1141    Ssh(SshConnectionOptions),
1142    Wsl(WslConnectionOptions),
1143    Docker(DockerConnectionOptions),
1144    #[cfg(any(test, feature = "test-support"))]
1145    Mock(crate::transport::mock::MockConnectionOptions),
1146}
1147
1148impl RemoteConnectionOptions {
1149    pub fn display_name(&self) -> String {
1150        match self {
1151            RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1152            RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1153            RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1154            #[cfg(any(test, feature = "test-support"))]
1155            RemoteConnectionOptions::Mock(opts) => format!("mock-{}", opts.id),
1156        }
1157    }
1158}
1159
1160impl From<SshConnectionOptions> for RemoteConnectionOptions {
1161    fn from(opts: SshConnectionOptions) -> Self {
1162        RemoteConnectionOptions::Ssh(opts)
1163    }
1164}
1165
1166impl From<WslConnectionOptions> for RemoteConnectionOptions {
1167    fn from(opts: WslConnectionOptions) -> Self {
1168        RemoteConnectionOptions::Wsl(opts)
1169    }
1170}
1171
1172#[cfg(any(test, feature = "test-support"))]
1173impl From<crate::transport::mock::MockConnectionOptions> for RemoteConnectionOptions {
1174    fn from(opts: crate::transport::mock::MockConnectionOptions) -> Self {
1175        RemoteConnectionOptions::Mock(opts)
1176    }
1177}
1178
1179#[cfg(target_os = "windows")]
1180/// Open a wsl path (\\wsl.localhost\<distro>\path)
1181#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1182#[action(namespace = workspace, no_json, no_register)]
1183pub struct OpenWslPath {
1184    pub distro: WslConnectionOptions,
1185    pub paths: Vec<PathBuf>,
1186}
1187
1188#[async_trait(?Send)]
1189pub trait RemoteConnection: Send + Sync {
1190    fn start_proxy(
1191        &self,
1192        unique_identifier: String,
1193        reconnect: bool,
1194        incoming_tx: UnboundedSender<Envelope>,
1195        outgoing_rx: UnboundedReceiver<Envelope>,
1196        connection_activity_tx: Sender<()>,
1197        delegate: Arc<dyn RemoteClientDelegate>,
1198        cx: &mut AsyncApp,
1199    ) -> Task<Result<i32>>;
1200    fn upload_directory(
1201        &self,
1202        src_path: PathBuf,
1203        dest_path: RemotePathBuf,
1204        cx: &App,
1205    ) -> Task<Result<()>>;
1206    async fn kill(&self) -> Result<()>;
1207    fn has_been_killed(&self) -> bool;
1208    fn shares_network_interface(&self) -> bool {
1209        false
1210    }
1211    fn build_command(
1212        &self,
1213        program: Option<String>,
1214        args: &[String],
1215        env: &HashMap<String, String>,
1216        working_dir: Option<String>,
1217        port_forward: Option<(u16, String, u16)>,
1218    ) -> Result<CommandTemplate>;
1219    fn build_forward_ports_command(
1220        &self,
1221        forwards: Vec<(u16, String, u16)>,
1222    ) -> Result<CommandTemplate>;
1223    fn connection_options(&self) -> RemoteConnectionOptions;
1224    fn path_style(&self) -> PathStyle;
1225    fn shell(&self) -> String;
1226    fn default_system_shell(&self) -> String;
1227    fn has_wsl_interop(&self) -> bool;
1228
1229    #[cfg(any(test, feature = "test-support"))]
1230    fn simulate_disconnect(&self, _: &AsyncApp) {}
1231}
1232
1233type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1234
1235struct Signal<T> {
1236    tx: Mutex<Option<oneshot::Sender<T>>>,
1237    rx: Shared<Task<Option<T>>>,
1238}
1239
1240impl<T: Send + Clone + 'static> Signal<T> {
1241    pub fn new(cx: &App) -> Self {
1242        let (tx, rx) = oneshot::channel();
1243
1244        let task = cx
1245            .background_executor()
1246            .spawn(async move { rx.await.ok() })
1247            .shared();
1248
1249        Self {
1250            tx: Mutex::new(Some(tx)),
1251            rx: task,
1252        }
1253    }
1254
1255    fn set(&self, value: T) {
1256        if let Some(tx) = self.tx.lock().take() {
1257            let _ = tx.send(value);
1258        }
1259    }
1260
1261    fn wait(&self) -> Shared<Task<Option<T>>> {
1262        self.rx.clone()
1263    }
1264}
1265
1266pub(crate) struct ChannelClient {
1267    next_message_id: AtomicU32,
1268    outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1269    buffer: Mutex<VecDeque<Envelope>>,
1270    response_channels: ResponseChannels,
1271    message_handlers: Mutex<ProtoMessageHandlerSet>,
1272    max_received: AtomicU32,
1273    name: &'static str,
1274    task: Mutex<Task<Result<()>>>,
1275    remote_started: Signal<()>,
1276    has_wsl_interop: bool,
1277}
1278
1279impl ChannelClient {
1280    pub(crate) fn new(
1281        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1282        outgoing_tx: mpsc::UnboundedSender<Envelope>,
1283        cx: &App,
1284        name: &'static str,
1285        has_wsl_interop: bool,
1286    ) -> Arc<Self> {
1287        Arc::new_cyclic(|this| Self {
1288            outgoing_tx: Mutex::new(outgoing_tx),
1289            next_message_id: AtomicU32::new(0),
1290            max_received: AtomicU32::new(0),
1291            response_channels: ResponseChannels::default(),
1292            message_handlers: Default::default(),
1293            buffer: Mutex::new(VecDeque::new()),
1294            name,
1295            task: Mutex::new(Self::start_handling_messages(
1296                this.clone(),
1297                incoming_rx,
1298                &cx.to_async(),
1299            )),
1300            remote_started: Signal::new(cx),
1301            has_wsl_interop,
1302        })
1303    }
1304
1305    fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1306        self.remote_started.wait()
1307    }
1308
1309    fn start_handling_messages(
1310        this: Weak<Self>,
1311        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1312        cx: &AsyncApp,
1313    ) -> Task<Result<()>> {
1314        cx.spawn(async move |cx| {
1315            if let Some(this) = this.upgrade() {
1316                let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1317                this.outgoing_tx.lock().unbounded_send(envelope).ok();
1318            };
1319
1320            let peer_id = PeerId { owner_id: 0, id: 0 };
1321            while let Some(incoming) = incoming_rx.next().await {
1322                let Some(this) = this.upgrade() else {
1323                    return anyhow::Ok(());
1324                };
1325                if let Some(ack_id) = incoming.ack_id {
1326                    let mut buffer = this.buffer.lock();
1327                    while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1328                        buffer.pop_front();
1329                    }
1330                }
1331                if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1332                {
1333                    log::debug!(
1334                        "{}:remote message received. name:FlushBufferedMessages",
1335                        this.name
1336                    );
1337                    {
1338                        let buffer = this.buffer.lock();
1339                        for envelope in buffer.iter() {
1340                            this.outgoing_tx
1341                                .lock()
1342                                .unbounded_send(envelope.clone())
1343                                .ok();
1344                        }
1345                    }
1346                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1347                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1348                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1349                    continue;
1350                }
1351
1352                if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1353                    this.remote_started.set(());
1354                    let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1355                    envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1356                    this.outgoing_tx.lock().unbounded_send(envelope).ok();
1357                    continue;
1358                }
1359
1360                this.max_received.store(incoming.id, SeqCst);
1361
1362                if let Some(request_id) = incoming.responding_to {
1363                    let request_id = MessageId(request_id);
1364                    let sender = this.response_channels.lock().remove(&request_id);
1365                    if let Some(sender) = sender {
1366                        let (tx, rx) = oneshot::channel();
1367                        if incoming.payload.is_some() {
1368                            sender.send((incoming, tx)).ok();
1369                        }
1370                        rx.await.ok();
1371                    }
1372                } else if let Some(envelope) =
1373                    build_typed_envelope(peer_id, Instant::now(), incoming)
1374                {
1375                    let type_name = envelope.payload_type_name();
1376                    let message_id = envelope.message_id();
1377                    if let Some(future) = ProtoMessageHandlerSet::handle_message(
1378                        &this.message_handlers,
1379                        envelope,
1380                        this.clone().into(),
1381                        cx.clone(),
1382                    ) {
1383                        log::debug!("{}:remote message received. name:{type_name}", this.name);
1384                        cx.foreground_executor()
1385                            .spawn(async move {
1386                                match future.await {
1387                                    Ok(_) => {
1388                                        log::debug!(
1389                                            "{}:remote message handled. name:{type_name}",
1390                                            this.name
1391                                        );
1392                                    }
1393                                    Err(error) => {
1394                                        log::error!(
1395                                            "{}:error handling message. type:{}, error:{:#}",
1396                                            this.name,
1397                                            type_name,
1398                                            format!("{error:#}").lines().fold(
1399                                                String::new(),
1400                                                |mut message, line| {
1401                                                    if !message.is_empty() {
1402                                                        message.push(' ');
1403                                                    }
1404                                                    message.push_str(line);
1405                                                    message
1406                                                }
1407                                            )
1408                                        );
1409                                    }
1410                                }
1411                            })
1412                            .detach()
1413                    } else {
1414                        log::error!("{}:unhandled remote message name:{type_name}", this.name);
1415                        if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1416                            message_id,
1417                            anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1418                        ) {
1419                            log::error!(
1420                                "{}:error sending error response for {type_name}:{e:#}",
1421                                this.name
1422                            );
1423                        }
1424                    }
1425                }
1426            }
1427            anyhow::Ok(())
1428        })
1429    }
1430
1431    pub(crate) fn reconnect(
1432        self: &Arc<Self>,
1433        incoming_rx: UnboundedReceiver<Envelope>,
1434        outgoing_tx: UnboundedSender<Envelope>,
1435        cx: &AsyncApp,
1436    ) {
1437        *self.outgoing_tx.lock() = outgoing_tx;
1438        *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1439    }
1440
1441    fn request<T: RequestMessage>(
1442        &self,
1443        payload: T,
1444    ) -> impl 'static + Future<Output = Result<T::Response>> {
1445        self.request_internal(payload, true)
1446    }
1447
1448    fn request_internal<T: RequestMessage>(
1449        &self,
1450        payload: T,
1451        use_buffer: bool,
1452    ) -> impl 'static + Future<Output = Result<T::Response>> {
1453        log::debug!("remote request start. name:{}", T::NAME);
1454        let response =
1455            self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1456        async move {
1457            let response = response.await?;
1458            log::debug!("remote request finish. name:{}", T::NAME);
1459            T::Response::from_envelope(response).context("received a response of the wrong type")
1460        }
1461    }
1462
1463    async fn resync(&self, timeout: Duration) -> Result<()> {
1464        smol::future::or(
1465            async {
1466                self.request_internal(proto::FlushBufferedMessages {}, false)
1467                    .await?;
1468
1469                for envelope in self.buffer.lock().iter() {
1470                    self.outgoing_tx
1471                        .lock()
1472                        .unbounded_send(envelope.clone())
1473                        .ok();
1474                }
1475                Ok(())
1476            },
1477            async {
1478                smol::Timer::after(timeout).await;
1479                anyhow::bail!("Timed out resyncing remote client")
1480            },
1481        )
1482        .await
1483    }
1484
1485    async fn ping(&self, timeout: Duration) -> Result<()> {
1486        smol::future::or(
1487            async {
1488                self.request(proto::Ping {}).await?;
1489                Ok(())
1490            },
1491            async {
1492                smol::Timer::after(timeout).await;
1493                anyhow::bail!("Timed out pinging remote client")
1494            },
1495        )
1496        .await
1497    }
1498
1499    fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1500        log::debug!("remote send name:{}", T::NAME);
1501        self.send_dynamic(payload.into_envelope(0, None, None))
1502    }
1503
1504    fn request_dynamic(
1505        &self,
1506        mut envelope: proto::Envelope,
1507        type_name: &'static str,
1508        use_buffer: bool,
1509    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1510        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1511        let (tx, rx) = oneshot::channel();
1512        let mut response_channels_lock = self.response_channels.lock();
1513        response_channels_lock.insert(MessageId(envelope.id), tx);
1514        drop(response_channels_lock);
1515
1516        let result = if use_buffer {
1517            self.send_buffered(envelope)
1518        } else {
1519            self.send_unbuffered(envelope)
1520        };
1521        async move {
1522            if let Err(error) = &result {
1523                log::error!("failed to send message: {error}");
1524                anyhow::bail!("failed to send message: {error}");
1525            }
1526
1527            let response = rx.await.context("connection lost")?.0;
1528            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1529                return Err(RpcError::from_proto(error, type_name));
1530            }
1531            Ok(response)
1532        }
1533    }
1534
1535    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1536        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1537        self.send_buffered(envelope)
1538    }
1539
1540    fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1541        envelope.ack_id = Some(self.max_received.load(SeqCst));
1542        self.buffer.lock().push_back(envelope.clone());
1543        // ignore errors on send (happen while we're reconnecting)
1544        // assume that the global "disconnected" overlay is sufficient.
1545        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1546        Ok(())
1547    }
1548
1549    fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1550        envelope.ack_id = Some(self.max_received.load(SeqCst));
1551        self.outgoing_tx.lock().unbounded_send(envelope).ok();
1552        Ok(())
1553    }
1554}
1555
1556impl ProtoClient for ChannelClient {
1557    fn request(
1558        &self,
1559        envelope: proto::Envelope,
1560        request_type: &'static str,
1561    ) -> BoxFuture<'static, Result<proto::Envelope>> {
1562        self.request_dynamic(envelope, request_type, true).boxed()
1563    }
1564
1565    fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1566        self.send_dynamic(envelope)
1567    }
1568
1569    fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1570        self.send_dynamic(envelope)
1571    }
1572
1573    fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1574        &self.message_handlers
1575    }
1576
1577    fn is_via_collab(&self) -> bool {
1578        false
1579    }
1580
1581    fn has_wsl_interop(&self) -> bool {
1582        self.has_wsl_interop
1583    }
1584}