1#[cfg(any(test, feature = "test-support"))]
2use crate::transport::mock::ConnectGuard;
3use crate::{
4 SshConnectionOptions,
5 protocol::MessageId,
6 proxy::ProxyLaunchError,
7 transport::{
8 docker::{DockerConnectionOptions, DockerExecConnection},
9 ssh::SshRemoteConnection,
10 wsl::{WslConnectionOptions, WslRemoteConnection},
11 },
12};
13use anyhow::{Context as _, Result, anyhow};
14use askpass::EncryptedPassword;
15use async_trait::async_trait;
16use collections::HashMap;
17use futures::{
18 Future, FutureExt as _, StreamExt as _,
19 channel::{
20 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
21 oneshot,
22 },
23 future::{BoxFuture, Shared},
24 select, select_biased,
25};
26use gpui::{
27 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
28 EventEmitter, FutureExt, Global, Task, WeakEntity,
29};
30use parking_lot::Mutex;
31
32use release_channel::ReleaseChannel;
33use rpc::{
34 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
35 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
36};
37use semver::Version;
38use std::{
39 collections::VecDeque,
40 fmt,
41 ops::ControlFlow,
42 path::PathBuf,
43 sync::{
44 Arc, Weak,
45 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
46 },
47 time::{Duration, Instant},
48};
49use util::{
50 ResultExt,
51 paths::{PathStyle, RemotePathBuf},
52};
53
54#[derive(Copy, Clone, Debug, PartialEq, Eq)]
55pub enum RemoteOs {
56 Linux,
57 MacOs,
58 Windows,
59}
60
61impl RemoteOs {
62 pub fn as_str(&self) -> &'static str {
63 match self {
64 RemoteOs::Linux => "linux",
65 RemoteOs::MacOs => "macos",
66 RemoteOs::Windows => "windows",
67 }
68 }
69
70 pub fn is_windows(&self) -> bool {
71 matches!(self, RemoteOs::Windows)
72 }
73}
74
75impl std::fmt::Display for RemoteOs {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.write_str(self.as_str())
78 }
79}
80
81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
82pub enum RemoteArch {
83 X86_64,
84 Aarch64,
85}
86
87impl RemoteArch {
88 pub fn as_str(&self) -> &'static str {
89 match self {
90 RemoteArch::X86_64 => "x86_64",
91 RemoteArch::Aarch64 => "aarch64",
92 }
93 }
94}
95
96impl std::fmt::Display for RemoteArch {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.write_str(self.as_str())
99 }
100}
101
102#[derive(Copy, Clone, Debug)]
103pub struct RemotePlatform {
104 pub os: RemoteOs,
105 pub arch: RemoteArch,
106}
107
108#[derive(Clone, Debug)]
109pub struct CommandTemplate {
110 pub program: String,
111 pub args: Vec<String>,
112 pub env: HashMap<String, String>,
113}
114
115pub trait RemoteClientDelegate: Send + Sync {
116 fn ask_password(
117 &self,
118 prompt: String,
119 tx: oneshot::Sender<EncryptedPassword>,
120 cx: &mut AsyncApp,
121 );
122 fn get_download_url(
123 &self,
124 platform: RemotePlatform,
125 release_channel: ReleaseChannel,
126 version: Option<Version>,
127 cx: &mut AsyncApp,
128 ) -> Task<Result<Option<String>>>;
129 fn download_server_binary_locally(
130 &self,
131 platform: RemotePlatform,
132 release_channel: ReleaseChannel,
133 version: Option<Version>,
134 cx: &mut AsyncApp,
135 ) -> Task<Result<PathBuf>>;
136 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
137}
138
139const MAX_MISSED_HEARTBEATS: usize = 5;
140const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
141const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
142const INITIAL_CONNECTION_TIMEOUT: Duration =
143 Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
144
145const MAX_RECONNECT_ATTEMPTS: usize = 3;
146
147enum State {
148 Connecting,
149 Connected {
150 remote_connection: Arc<dyn RemoteConnection>,
151 delegate: Arc<dyn RemoteClientDelegate>,
152
153 multiplex_task: Task<Result<()>>,
154 heartbeat_task: Task<Result<()>>,
155 },
156 HeartbeatMissed {
157 missed_heartbeats: usize,
158
159 remote_connection: Arc<dyn RemoteConnection>,
160 delegate: Arc<dyn RemoteClientDelegate>,
161
162 multiplex_task: Task<Result<()>>,
163 heartbeat_task: Task<Result<()>>,
164 },
165 Reconnecting,
166 ReconnectFailed {
167 remote_connection: Arc<dyn RemoteConnection>,
168 delegate: Arc<dyn RemoteClientDelegate>,
169
170 error: anyhow::Error,
171 attempts: usize,
172 },
173 ReconnectExhausted,
174 ServerNotRunning,
175}
176
177impl fmt::Display for State {
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 match self {
180 Self::Connecting => write!(f, "connecting"),
181 Self::Connected { .. } => write!(f, "connected"),
182 Self::Reconnecting => write!(f, "reconnecting"),
183 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
184 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
185 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
186 Self::ServerNotRunning { .. } => write!(f, "server not running"),
187 }
188 }
189}
190
191impl State {
192 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
193 match self {
194 Self::Connected {
195 remote_connection, ..
196 } => Some(remote_connection.clone()),
197 Self::HeartbeatMissed {
198 remote_connection, ..
199 } => Some(remote_connection.clone()),
200 Self::ReconnectFailed {
201 remote_connection, ..
202 } => Some(remote_connection.clone()),
203 _ => None,
204 }
205 }
206
207 fn can_reconnect(&self) -> bool {
208 match self {
209 Self::Connected { .. }
210 | Self::HeartbeatMissed { .. }
211 | Self::ReconnectFailed { .. } => true,
212 State::Connecting
213 | State::Reconnecting
214 | State::ReconnectExhausted
215 | State::ServerNotRunning => false,
216 }
217 }
218
219 fn is_reconnect_failed(&self) -> bool {
220 matches!(self, Self::ReconnectFailed { .. })
221 }
222
223 fn is_reconnect_exhausted(&self) -> bool {
224 matches!(self, Self::ReconnectExhausted { .. })
225 }
226
227 fn is_server_not_running(&self) -> bool {
228 matches!(self, Self::ServerNotRunning)
229 }
230
231 fn is_reconnecting(&self) -> bool {
232 matches!(self, Self::Reconnecting { .. })
233 }
234
235 fn heartbeat_recovered(self) -> Self {
236 match self {
237 Self::HeartbeatMissed {
238 remote_connection,
239 delegate,
240 multiplex_task,
241 heartbeat_task,
242 ..
243 } => Self::Connected {
244 remote_connection: remote_connection,
245 delegate,
246 multiplex_task,
247 heartbeat_task,
248 },
249 _ => self,
250 }
251 }
252
253 fn heartbeat_missed(self) -> Self {
254 match self {
255 Self::Connected {
256 remote_connection,
257 delegate,
258 multiplex_task,
259 heartbeat_task,
260 } => Self::HeartbeatMissed {
261 missed_heartbeats: 1,
262 remote_connection,
263 delegate,
264 multiplex_task,
265 heartbeat_task,
266 },
267 Self::HeartbeatMissed {
268 missed_heartbeats,
269 remote_connection,
270 delegate,
271 multiplex_task,
272 heartbeat_task,
273 } => Self::HeartbeatMissed {
274 missed_heartbeats: missed_heartbeats + 1,
275 remote_connection,
276 delegate,
277 multiplex_task,
278 heartbeat_task,
279 },
280 _ => self,
281 }
282 }
283}
284
285/// The state of the ssh connection.
286#[derive(Clone, Copy, Debug, PartialEq, Eq)]
287pub enum ConnectionState {
288 Connecting,
289 Connected,
290 HeartbeatMissed,
291 Reconnecting,
292 Disconnected,
293}
294
295impl From<&State> for ConnectionState {
296 fn from(value: &State) -> Self {
297 match value {
298 State::Connecting => Self::Connecting,
299 State::Connected { .. } => Self::Connected,
300 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
301 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
302 State::ReconnectExhausted => Self::Disconnected,
303 State::ServerNotRunning => Self::Disconnected,
304 }
305 }
306}
307
308pub struct RemoteClient {
309 client: Arc<ChannelClient>,
310 unique_identifier: String,
311 connection_options: RemoteConnectionOptions,
312 path_style: PathStyle,
313 state: Option<State>,
314}
315
316#[derive(Debug)]
317pub enum RemoteClientEvent {
318 Disconnected,
319}
320
321impl EventEmitter<RemoteClientEvent> for RemoteClient {}
322
323/// Identifies the socket on the remote server so that reconnects
324/// can re-join the same project.
325pub enum ConnectionIdentifier {
326 Setup(u64),
327 Workspace(i64),
328}
329
330static NEXT_ID: AtomicU64 = AtomicU64::new(1);
331
332impl ConnectionIdentifier {
333 pub fn setup() -> Self {
334 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
335 }
336
337 // This string gets used in a socket name, and so must be relatively short.
338 // The total length of:
339 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
340 // Must be less than about 100 characters
341 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
342 // So our strings should be at most 20 characters or so.
343 fn to_string(&self, cx: &App) -> String {
344 let identifier_prefix = match ReleaseChannel::global(cx) {
345 ReleaseChannel::Stable => "".to_string(),
346 release_channel => format!("{}-", release_channel.dev_name()),
347 };
348 match self {
349 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
350 Self::Workspace(workspace_id) => {
351 format!("{identifier_prefix}workspace-{workspace_id}",)
352 }
353 }
354 }
355}
356
357pub async fn connect(
358 connection_options: RemoteConnectionOptions,
359 delegate: Arc<dyn RemoteClientDelegate>,
360 cx: &mut AsyncApp,
361) -> Result<Arc<dyn RemoteConnection>> {
362 cx.update(|cx| {
363 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
364 pool.connect(connection_options.clone(), delegate.clone(), cx)
365 })
366 })
367 .await
368 .map_err(|e| e.cloned())
369}
370
371impl RemoteClient {
372 pub fn new(
373 unique_identifier: ConnectionIdentifier,
374 remote_connection: Arc<dyn RemoteConnection>,
375 cancellation: oneshot::Receiver<()>,
376 delegate: Arc<dyn RemoteClientDelegate>,
377 cx: &mut App,
378 ) -> Task<Result<Option<Entity<Self>>>> {
379 let unique_identifier = unique_identifier.to_string(cx);
380 cx.spawn(async move |cx| {
381 let success = Box::pin(async move {
382 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
383 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
384 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
385
386 let client = cx.update(|cx| {
387 ChannelClient::new(
388 incoming_rx,
389 outgoing_tx,
390 cx,
391 "client",
392 remote_connection.has_wsl_interop(),
393 )
394 });
395
396 let path_style = remote_connection.path_style();
397 let this = cx.new(|_| Self {
398 client: client.clone(),
399 unique_identifier: unique_identifier.clone(),
400 connection_options: remote_connection.connection_options(),
401 path_style,
402 state: Some(State::Connecting),
403 });
404
405 let io_task = remote_connection.start_proxy(
406 unique_identifier,
407 false,
408 incoming_tx,
409 outgoing_rx,
410 connection_activity_tx,
411 delegate.clone(),
412 cx,
413 );
414
415 let ready = client
416 .wait_for_remote_started()
417 .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
418 .await;
419 match ready {
420 Ok(Some(_)) => {}
421 Ok(None) => {
422 let mut error = "remote client exited before becoming ready".to_owned();
423 if let Some(status) = io_task.now_or_never() {
424 match status {
425 Ok(exit_code) => {
426 error.push_str(&format!(", exit_code={exit_code:?}"))
427 }
428 Err(e) => error.push_str(&format!(", error={e:?}")),
429 }
430 }
431 let error = anyhow::anyhow!("{error}");
432 log::error!("failed to establish connection: {}", error);
433 return Err(error);
434 }
435 Err(_) => {
436 let mut error =
437 "remote client did not become ready within the timeout".to_owned();
438 if let Some(status) = io_task.now_or_never() {
439 match status {
440 Ok(exit_code) => {
441 error.push_str(&format!(", exit_code={exit_code:?}"))
442 }
443 Err(e) => error.push_str(&format!(", error={e:?}")),
444 }
445 }
446 let error = anyhow::anyhow!("{error}");
447 log::error!("failed to establish connection: {}", error);
448 return Err(error);
449 }
450 }
451 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
452 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
453 log::error!("failed to establish connection: {}", error);
454 return Err(error);
455 }
456
457 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
458
459 this.update(cx, |this, _| {
460 this.state = Some(State::Connected {
461 remote_connection,
462 delegate,
463 multiplex_task,
464 heartbeat_task,
465 });
466 });
467
468 Ok(Some(this))
469 });
470
471 select! {
472 _ = cancellation.fuse() => {
473 Ok(None)
474 }
475 result = success.fuse() => result
476 }
477 })
478 }
479
480 pub fn proto_client_from_channels(
481 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
482 outgoing_tx: mpsc::UnboundedSender<Envelope>,
483 cx: &App,
484 name: &'static str,
485 has_wsl_interop: bool,
486 ) -> AnyProtoClient {
487 ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
488 }
489
490 pub fn shutdown_processes<T: RequestMessage>(
491 &mut self,
492 shutdown_request: Option<T>,
493 executor: BackgroundExecutor,
494 ) -> Option<impl Future<Output = ()> + use<T>> {
495 let state = self.state.take()?;
496 log::info!("shutting down remote processes");
497
498 let State::Connected {
499 multiplex_task,
500 heartbeat_task,
501 remote_connection,
502 delegate,
503 } = state
504 else {
505 return None;
506 };
507
508 let client = self.client.clone();
509
510 Some(async move {
511 if let Some(shutdown_request) = shutdown_request {
512 client.send(shutdown_request).log_err();
513 // We wait 50ms instead of waiting for a response, because
514 // waiting for a response would require us to wait on the main thread
515 // which we want to avoid in an `on_app_quit` callback.
516 executor.timer(Duration::from_millis(50)).await;
517 }
518
519 // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
520 // child of master_process.
521 drop(multiplex_task);
522 // Now drop the rest of state, which kills master process.
523 drop(heartbeat_task);
524 drop(remote_connection);
525 drop(delegate);
526 })
527 }
528
529 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
530 let can_reconnect = self
531 .state
532 .as_ref()
533 .map(|state| state.can_reconnect())
534 .unwrap_or(false);
535 if !can_reconnect {
536 log::info!("aborting reconnect, because not in state that allows reconnecting");
537 let error = if let Some(state) = self.state.as_ref() {
538 format!("invalid state, cannot reconnect while in state {state}")
539 } else {
540 "no state set".to_string()
541 };
542 anyhow::bail!(error);
543 }
544
545 let state = self.state.take().unwrap();
546 let (attempts, remote_connection, delegate) = match state {
547 State::Connected {
548 remote_connection,
549 delegate,
550 multiplex_task,
551 heartbeat_task,
552 }
553 | State::HeartbeatMissed {
554 remote_connection,
555 delegate,
556 multiplex_task,
557 heartbeat_task,
558 ..
559 } => {
560 drop(multiplex_task);
561 drop(heartbeat_task);
562 (0, remote_connection, delegate)
563 }
564 State::ReconnectFailed {
565 attempts,
566 remote_connection,
567 delegate,
568 ..
569 } => (attempts, remote_connection, delegate),
570 State::Connecting
571 | State::Reconnecting
572 | State::ReconnectExhausted
573 | State::ServerNotRunning => unreachable!(),
574 };
575
576 let attempts = attempts + 1;
577 if attempts > MAX_RECONNECT_ATTEMPTS {
578 log::error!(
579 "Failed to reconnect to after {} attempts, giving up",
580 MAX_RECONNECT_ATTEMPTS
581 );
582 self.set_state(State::ReconnectExhausted, cx);
583 return Ok(());
584 }
585
586 self.set_state(State::Reconnecting, cx);
587
588 log::info!(
589 "Trying to reconnect to remote server... Attempt {}",
590 attempts
591 );
592
593 let unique_identifier = self.unique_identifier.clone();
594 let client = self.client.clone();
595 let reconnect_task = cx.spawn(async move |this, cx| {
596 macro_rules! failed {
597 ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
598 delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
599 return State::ReconnectFailed {
600 error: anyhow!($error),
601 attempts: $attempts,
602 remote_connection: $remote_connection,
603 delegate: $delegate,
604 };
605 };
606 }
607
608 if let Err(error) = remote_connection
609 .kill()
610 .await
611 .context("Failed to kill remote_connection process")
612 {
613 failed!(error, attempts, remote_connection, delegate);
614 };
615
616 let connection_options = remote_connection.connection_options();
617
618 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
619 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
620 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
621
622 let (remote_connection, io_task) = match async {
623 let remote_connection = cx
624 .update_global(|pool: &mut ConnectionPool, cx| {
625 pool.connect(connection_options, delegate.clone(), cx)
626 })
627 .await
628 .map_err(|error| error.cloned())?;
629
630 let io_task = remote_connection.start_proxy(
631 unique_identifier,
632 true,
633 incoming_tx,
634 outgoing_rx,
635 connection_activity_tx,
636 delegate.clone(),
637 cx,
638 );
639 anyhow::Ok((remote_connection, io_task))
640 }
641 .await
642 {
643 Ok((remote_connection, io_task)) => (remote_connection, io_task),
644 Err(error) => {
645 failed!(error, attempts, remote_connection, delegate);
646 }
647 };
648
649 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
650 client.reconnect(incoming_rx, outgoing_tx, cx);
651
652 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
653 failed!(error, attempts, remote_connection, delegate);
654 };
655
656 State::Connected {
657 remote_connection: remote_connection,
658 delegate,
659 multiplex_task,
660 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
661 }
662 });
663
664 cx.spawn(async move |this, cx| {
665 let new_state = reconnect_task.await;
666 this.update(cx, |this, cx| {
667 this.try_set_state(cx, |old_state| {
668 if old_state.is_reconnecting() {
669 match &new_state {
670 State::Connecting
671 | State::Reconnecting
672 | State::HeartbeatMissed { .. }
673 | State::ServerNotRunning => {}
674 State::Connected { .. } => {
675 log::info!("Successfully reconnected");
676 }
677 State::ReconnectFailed {
678 error, attempts, ..
679 } => {
680 log::error!(
681 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
682 attempts,
683 error
684 );
685 }
686 State::ReconnectExhausted => {
687 log::error!("Reconnect attempt failed and all attempts exhausted");
688 }
689 }
690 Some(new_state)
691 } else {
692 None
693 }
694 });
695
696 if this.state_is(State::is_reconnect_failed) {
697 this.reconnect(cx)
698 } else if this.state_is(State::is_reconnect_exhausted) {
699 Ok(())
700 } else {
701 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
702 Ok(())
703 }
704 })
705 })
706 .detach_and_log_err(cx);
707
708 Ok(())
709 }
710
711 fn heartbeat(
712 this: WeakEntity<Self>,
713 mut connection_activity_rx: mpsc::Receiver<()>,
714 cx: &mut AsyncApp,
715 ) -> Task<Result<()>> {
716 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
717 return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
718 };
719
720 cx.spawn(async move |cx| {
721 let mut missed_heartbeats = 0;
722
723 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
724 futures::pin_mut!(keepalive_timer);
725
726 loop {
727 select_biased! {
728 result = connection_activity_rx.next().fuse() => {
729 if result.is_none() {
730 log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
731 return Ok(());
732 }
733
734 if missed_heartbeats != 0 {
735 missed_heartbeats = 0;
736 let _ =this.update(cx, |this, cx| {
737 this.handle_heartbeat_result(missed_heartbeats, cx)
738 })?;
739 }
740 }
741 _ = keepalive_timer => {
742 log::debug!("Sending heartbeat to server...");
743
744 let result = select_biased! {
745 _ = connection_activity_rx.next().fuse() => {
746 Ok(())
747 }
748 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
749 ping_result
750 }
751 };
752
753 if result.is_err() {
754 missed_heartbeats += 1;
755 log::warn!(
756 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
757 HEARTBEAT_TIMEOUT,
758 missed_heartbeats,
759 MAX_MISSED_HEARTBEATS
760 );
761 } else if missed_heartbeats != 0 {
762 missed_heartbeats = 0;
763 } else {
764 continue;
765 }
766
767 let result = this.update(cx, |this, cx| {
768 this.handle_heartbeat_result(missed_heartbeats, cx)
769 })?;
770 if result.is_break() {
771 return Ok(());
772 }
773 }
774 }
775
776 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
777 }
778 })
779 }
780
781 fn handle_heartbeat_result(
782 &mut self,
783 missed_heartbeats: usize,
784 cx: &mut Context<Self>,
785 ) -> ControlFlow<()> {
786 let state = self.state.take().unwrap();
787 let next_state = if missed_heartbeats > 0 {
788 state.heartbeat_missed()
789 } else {
790 state.heartbeat_recovered()
791 };
792
793 self.set_state(next_state, cx);
794
795 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
796 log::error!(
797 "Missed last {} heartbeats. Reconnecting...",
798 missed_heartbeats
799 );
800
801 self.reconnect(cx)
802 .context("failed to start reconnect process after missing heartbeats")
803 .log_err();
804 ControlFlow::Break(())
805 } else {
806 ControlFlow::Continue(())
807 }
808 }
809
810 fn monitor(
811 this: WeakEntity<Self>,
812 io_task: Task<Result<i32>>,
813 cx: &AsyncApp,
814 ) -> Task<Result<()>> {
815 cx.spawn(async move |cx| {
816 let result = io_task.await;
817
818 match result {
819 Ok(exit_code) => {
820 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
821 match error {
822 ProxyLaunchError::ServerNotRunning => {
823 log::error!("failed to reconnect because server is not running");
824 this.update(cx, |this, cx| {
825 this.set_state(State::ServerNotRunning, cx);
826 })?;
827 }
828 }
829 } else if exit_code > 0 {
830 log::error!("proxy process terminated unexpectedly");
831 this.update(cx, |this, cx| {
832 this.reconnect(cx).ok();
833 })?;
834 }
835 }
836 Err(error) => {
837 log::warn!(
838 "remote io task died with error: {:?}. reconnecting...",
839 error
840 );
841 this.update(cx, |this, cx| {
842 this.reconnect(cx).ok();
843 })?;
844 }
845 }
846
847 Ok(())
848 })
849 }
850
851 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
852 self.state.as_ref().is_some_and(check)
853 }
854
855 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
856 let new_state = self.state.as_ref().and_then(map);
857 if let Some(new_state) = new_state {
858 self.state.replace(new_state);
859 cx.notify();
860 }
861 }
862
863 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
864 log::info!("setting state to '{}'", &state);
865
866 let is_reconnect_exhausted = state.is_reconnect_exhausted();
867 let is_server_not_running = state.is_server_not_running();
868 self.state.replace(state);
869
870 if is_reconnect_exhausted || is_server_not_running {
871 cx.emit(RemoteClientEvent::Disconnected);
872 }
873 cx.notify();
874 }
875
876 pub fn shell(&self) -> Option<String> {
877 Some(self.remote_connection()?.shell())
878 }
879
880 pub fn default_system_shell(&self) -> Option<String> {
881 Some(self.remote_connection()?.default_system_shell())
882 }
883
884 pub fn shares_network_interface(&self) -> bool {
885 self.remote_connection()
886 .map_or(false, |connection| connection.shares_network_interface())
887 }
888
889 pub fn build_command(
890 &self,
891 program: Option<String>,
892 args: &[String],
893 env: &HashMap<String, String>,
894 working_dir: Option<String>,
895 port_forward: Option<(u16, String, u16)>,
896 ) -> Result<CommandTemplate> {
897 let Some(connection) = self.remote_connection() else {
898 return Err(anyhow!("no remote connection"));
899 };
900 connection.build_command(program, args, env, working_dir, port_forward)
901 }
902
903 pub fn build_forward_ports_command(
904 &self,
905 forwards: Vec<(u16, String, u16)>,
906 ) -> Result<CommandTemplate> {
907 let Some(connection) = self.remote_connection() else {
908 return Err(anyhow!("no remote connection"));
909 };
910 connection.build_forward_ports_command(forwards)
911 }
912
913 pub fn upload_directory(
914 &self,
915 src_path: PathBuf,
916 dest_path: RemotePathBuf,
917 cx: &App,
918 ) -> Task<Result<()>> {
919 let Some(connection) = self.remote_connection() else {
920 return Task::ready(Err(anyhow!("no remote connection")));
921 };
922 connection.upload_directory(src_path, dest_path, cx)
923 }
924
925 pub fn proto_client(&self) -> AnyProtoClient {
926 self.client.clone().into()
927 }
928
929 pub fn connection_options(&self) -> RemoteConnectionOptions {
930 self.connection_options.clone()
931 }
932
933 pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
934 if let State::Connected {
935 remote_connection, ..
936 } = self.state.as_ref()?
937 {
938 Some(remote_connection.clone())
939 } else {
940 None
941 }
942 }
943
944 pub fn connection_state(&self) -> ConnectionState {
945 self.state
946 .as_ref()
947 .map(ConnectionState::from)
948 .unwrap_or(ConnectionState::Disconnected)
949 }
950
951 pub fn is_disconnected(&self) -> bool {
952 self.connection_state() == ConnectionState::Disconnected
953 }
954
955 pub fn path_style(&self) -> PathStyle {
956 self.path_style
957 }
958
959 #[cfg(any(test, feature = "test-support"))]
960 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
961 let opts = self.connection_options();
962 client_cx.spawn(async move |cx| {
963 let connection = cx.update_global(|c: &mut ConnectionPool, _| {
964 if let Some(ConnectionPoolEntry::Connected(c)) = c.connections.get(&opts) {
965 if let Some(connection) = c.upgrade() {
966 connection
967 } else {
968 panic!("connection was dropped")
969 }
970 } else {
971 panic!("missing test connection")
972 }
973 });
974
975 connection.simulate_disconnect(cx);
976 })
977 }
978
979 /// Creates a mock connection pair for testing.
980 ///
981 /// This is the recommended way to create mock remote connections for tests.
982 /// It returns the `MockConnectionOptions` (which can be passed to create a
983 /// `HeadlessProject`), an `AnyProtoClient` for the server side and a
984 /// `ConnectGuard` for the client side which blocks the connection from
985 /// being established until dropped.
986 ///
987 /// # Example
988 /// ```ignore
989 /// let (opts, server_session, connect_guard) = RemoteClient::fake_server(cx, server_cx);
990 /// // Set up HeadlessProject with server_session...
991 /// drop(connect_guard);
992 /// let client = RemoteClient::fake_client(opts, cx).await;
993 /// ```
994 #[cfg(any(test, feature = "test-support"))]
995 pub fn fake_server(
996 client_cx: &mut gpui::TestAppContext,
997 server_cx: &mut gpui::TestAppContext,
998 ) -> (RemoteConnectionOptions, AnyProtoClient, ConnectGuard) {
999 use crate::transport::mock::MockConnection;
1000 let (opts, server_client, connect_guard) = MockConnection::new(client_cx, server_cx);
1001 (opts.into(), server_client, connect_guard)
1002 }
1003
1004 /// Creates a `RemoteClient` connected to a mock server.
1005 ///
1006 /// Call `fake_server` first to get the connection options, set up the
1007 /// `HeadlessProject` with the server session, then call this method
1008 /// to create the client.
1009 #[cfg(any(test, feature = "test-support"))]
1010 pub async fn connect_mock(
1011 opts: RemoteConnectionOptions,
1012 client_cx: &mut gpui::TestAppContext,
1013 ) -> Entity<Self> {
1014 assert!(matches!(opts, RemoteConnectionOptions::Mock(..)));
1015 use crate::transport::mock::MockDelegate;
1016 let (_tx, rx) = oneshot::channel();
1017 let mut cx = client_cx.to_async();
1018 let connection = connect(opts, Arc::new(MockDelegate), &mut cx)
1019 .await
1020 .unwrap();
1021 client_cx
1022 .update(|cx| {
1023 Self::new(
1024 ConnectionIdentifier::setup(),
1025 connection,
1026 rx,
1027 Arc::new(MockDelegate),
1028 cx,
1029 )
1030 })
1031 .await
1032 .unwrap()
1033 .unwrap()
1034 }
1035
1036 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1037 self.state
1038 .as_ref()
1039 .and_then(|state| state.remote_connection())
1040 }
1041}
1042
1043enum ConnectionPoolEntry {
1044 Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1045 Connected(Weak<dyn RemoteConnection>),
1046}
1047
1048#[derive(Default)]
1049struct ConnectionPool {
1050 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1051}
1052
1053impl Global for ConnectionPool {}
1054
1055impl ConnectionPool {
1056 fn connect(
1057 &mut self,
1058 opts: RemoteConnectionOptions,
1059 delegate: Arc<dyn RemoteClientDelegate>,
1060 cx: &mut App,
1061 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1062 let connection = self.connections.get(&opts);
1063 match connection {
1064 Some(ConnectionPoolEntry::Connecting(task)) => {
1065 delegate.set_status(
1066 Some("Waiting for existing connection attempt"),
1067 &mut cx.to_async(),
1068 );
1069 return task.clone();
1070 }
1071 Some(ConnectionPoolEntry::Connected(remote)) => {
1072 if let Some(remote) = remote.upgrade()
1073 && !remote.has_been_killed()
1074 {
1075 return Task::ready(Ok(remote)).shared();
1076 }
1077 self.connections.remove(&opts);
1078 }
1079 None => {}
1080 }
1081
1082 let task = cx
1083 .spawn({
1084 let opts = opts.clone();
1085 let delegate = delegate.clone();
1086 async move |cx| {
1087 let connection = match opts.clone() {
1088 RemoteConnectionOptions::Ssh(opts) => {
1089 SshRemoteConnection::new(opts, delegate, cx)
1090 .await
1091 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1092 }
1093 RemoteConnectionOptions::Wsl(opts) => {
1094 WslRemoteConnection::new(opts, delegate, cx)
1095 .await
1096 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1097 }
1098 RemoteConnectionOptions::Docker(opts) => {
1099 DockerExecConnection::new(opts, delegate, cx)
1100 .await
1101 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1102 }
1103 #[cfg(any(test, feature = "test-support"))]
1104 RemoteConnectionOptions::Mock(opts) => match cx.update(|cx| {
1105 cx.default_global::<crate::transport::mock::MockConnectionRegistry>()
1106 .take(&opts)
1107 }) {
1108 Some(connection) => Ok(connection.await as Arc<dyn RemoteConnection>),
1109 None => Err(anyhow!(
1110 "Mock connection not found. Call MockConnection::new() first."
1111 )),
1112 },
1113 };
1114
1115 cx.update_global(|pool: &mut Self, _| {
1116 debug_assert!(matches!(
1117 pool.connections.get(&opts),
1118 Some(ConnectionPoolEntry::Connecting(_))
1119 ));
1120 match connection {
1121 Ok(connection) => {
1122 pool.connections.insert(
1123 opts.clone(),
1124 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1125 );
1126 Ok(connection)
1127 }
1128 Err(error) => {
1129 pool.connections.remove(&opts);
1130 Err(Arc::new(error))
1131 }
1132 }
1133 })
1134 }
1135 })
1136 .shared();
1137
1138 self.connections
1139 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1140 task
1141 }
1142}
1143
1144#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1145pub enum RemoteConnectionOptions {
1146 Ssh(SshConnectionOptions),
1147 Wsl(WslConnectionOptions),
1148 Docker(DockerConnectionOptions),
1149 #[cfg(any(test, feature = "test-support"))]
1150 Mock(crate::transport::mock::MockConnectionOptions),
1151}
1152
1153impl RemoteConnectionOptions {
1154 pub fn display_name(&self) -> String {
1155 match self {
1156 RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1157 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1158 RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1159 #[cfg(any(test, feature = "test-support"))]
1160 RemoteConnectionOptions::Mock(opts) => format!("mock-{}", opts.id),
1161 }
1162 }
1163}
1164
1165impl From<SshConnectionOptions> for RemoteConnectionOptions {
1166 fn from(opts: SshConnectionOptions) -> Self {
1167 RemoteConnectionOptions::Ssh(opts)
1168 }
1169}
1170
1171impl From<WslConnectionOptions> for RemoteConnectionOptions {
1172 fn from(opts: WslConnectionOptions) -> Self {
1173 RemoteConnectionOptions::Wsl(opts)
1174 }
1175}
1176
1177#[cfg(any(test, feature = "test-support"))]
1178impl From<crate::transport::mock::MockConnectionOptions> for RemoteConnectionOptions {
1179 fn from(opts: crate::transport::mock::MockConnectionOptions) -> Self {
1180 RemoteConnectionOptions::Mock(opts)
1181 }
1182}
1183
1184#[cfg(target_os = "windows")]
1185/// Open a wsl path (\\wsl.localhost\<distro>\path)
1186#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1187#[action(namespace = workspace, no_json, no_register)]
1188pub struct OpenWslPath {
1189 pub distro: WslConnectionOptions,
1190 pub paths: Vec<PathBuf>,
1191}
1192
1193#[async_trait(?Send)]
1194pub trait RemoteConnection: Send + Sync {
1195 fn start_proxy(
1196 &self,
1197 unique_identifier: String,
1198 reconnect: bool,
1199 incoming_tx: UnboundedSender<Envelope>,
1200 outgoing_rx: UnboundedReceiver<Envelope>,
1201 connection_activity_tx: Sender<()>,
1202 delegate: Arc<dyn RemoteClientDelegate>,
1203 cx: &mut AsyncApp,
1204 ) -> Task<Result<i32>>;
1205 fn upload_directory(
1206 &self,
1207 src_path: PathBuf,
1208 dest_path: RemotePathBuf,
1209 cx: &App,
1210 ) -> Task<Result<()>>;
1211 async fn kill(&self) -> Result<()>;
1212 fn has_been_killed(&self) -> bool;
1213 fn shares_network_interface(&self) -> bool {
1214 false
1215 }
1216 fn build_command(
1217 &self,
1218 program: Option<String>,
1219 args: &[String],
1220 env: &HashMap<String, String>,
1221 working_dir: Option<String>,
1222 port_forward: Option<(u16, String, u16)>,
1223 ) -> Result<CommandTemplate>;
1224 fn build_forward_ports_command(
1225 &self,
1226 forwards: Vec<(u16, String, u16)>,
1227 ) -> Result<CommandTemplate>;
1228 fn connection_options(&self) -> RemoteConnectionOptions;
1229 fn path_style(&self) -> PathStyle;
1230 fn shell(&self) -> String;
1231 fn default_system_shell(&self) -> String;
1232 fn has_wsl_interop(&self) -> bool;
1233
1234 #[cfg(any(test, feature = "test-support"))]
1235 fn simulate_disconnect(&self, _: &AsyncApp) {}
1236}
1237
1238type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1239
1240struct Signal<T> {
1241 tx: Mutex<Option<oneshot::Sender<T>>>,
1242 rx: Shared<Task<Option<T>>>,
1243}
1244
1245impl<T: Send + Clone + 'static> Signal<T> {
1246 pub fn new(cx: &App) -> Self {
1247 let (tx, rx) = oneshot::channel();
1248
1249 let task = cx
1250 .background_executor()
1251 .spawn(async move { rx.await.ok() })
1252 .shared();
1253
1254 Self {
1255 tx: Mutex::new(Some(tx)),
1256 rx: task,
1257 }
1258 }
1259
1260 fn set(&self, value: T) {
1261 if let Some(tx) = self.tx.lock().take() {
1262 let _ = tx.send(value);
1263 }
1264 }
1265
1266 fn wait(&self) -> Shared<Task<Option<T>>> {
1267 self.rx.clone()
1268 }
1269}
1270
1271pub(crate) struct ChannelClient {
1272 next_message_id: AtomicU32,
1273 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1274 buffer: Mutex<VecDeque<Envelope>>,
1275 response_channels: ResponseChannels,
1276 message_handlers: Mutex<ProtoMessageHandlerSet>,
1277 max_received: AtomicU32,
1278 name: &'static str,
1279 task: Mutex<Task<Result<()>>>,
1280 remote_started: Signal<()>,
1281 has_wsl_interop: bool,
1282}
1283
1284impl ChannelClient {
1285 pub(crate) fn new(
1286 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1287 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1288 cx: &App,
1289 name: &'static str,
1290 has_wsl_interop: bool,
1291 ) -> Arc<Self> {
1292 Arc::new_cyclic(|this| Self {
1293 outgoing_tx: Mutex::new(outgoing_tx),
1294 next_message_id: AtomicU32::new(0),
1295 max_received: AtomicU32::new(0),
1296 response_channels: ResponseChannels::default(),
1297 message_handlers: Default::default(),
1298 buffer: Mutex::new(VecDeque::new()),
1299 name,
1300 task: Mutex::new(Self::start_handling_messages(
1301 this.clone(),
1302 incoming_rx,
1303 &cx.to_async(),
1304 )),
1305 remote_started: Signal::new(cx),
1306 has_wsl_interop,
1307 })
1308 }
1309
1310 fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1311 self.remote_started.wait()
1312 }
1313
1314 fn start_handling_messages(
1315 this: Weak<Self>,
1316 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1317 cx: &AsyncApp,
1318 ) -> Task<Result<()>> {
1319 cx.spawn(async move |cx| {
1320 if let Some(this) = this.upgrade() {
1321 let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1322 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1323 };
1324
1325 let peer_id = PeerId { owner_id: 0, id: 0 };
1326 while let Some(incoming) = incoming_rx.next().await {
1327 let Some(this) = this.upgrade() else {
1328 return anyhow::Ok(());
1329 };
1330 if let Some(ack_id) = incoming.ack_id {
1331 let mut buffer = this.buffer.lock();
1332 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1333 buffer.pop_front();
1334 }
1335 }
1336 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1337 {
1338 log::debug!(
1339 "{}:remote message received. name:FlushBufferedMessages",
1340 this.name
1341 );
1342 {
1343 let buffer = this.buffer.lock();
1344 for envelope in buffer.iter() {
1345 this.outgoing_tx
1346 .lock()
1347 .unbounded_send(envelope.clone())
1348 .ok();
1349 }
1350 }
1351 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1352 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1353 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1354 continue;
1355 }
1356
1357 if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1358 this.remote_started.set(());
1359 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1360 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1361 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1362 continue;
1363 }
1364
1365 this.max_received.store(incoming.id, SeqCst);
1366
1367 if let Some(request_id) = incoming.responding_to {
1368 let request_id = MessageId(request_id);
1369 let sender = this.response_channels.lock().remove(&request_id);
1370 if let Some(sender) = sender {
1371 let (tx, rx) = oneshot::channel();
1372 if incoming.payload.is_some() {
1373 sender.send((incoming, tx)).ok();
1374 }
1375 rx.await.ok();
1376 }
1377 } else if let Some(envelope) =
1378 build_typed_envelope(peer_id, Instant::now(), incoming)
1379 {
1380 let type_name = envelope.payload_type_name();
1381 let message_id = envelope.message_id();
1382 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1383 &this.message_handlers,
1384 envelope,
1385 this.clone().into(),
1386 cx.clone(),
1387 ) {
1388 log::debug!("{}:remote message received. name:{type_name}", this.name);
1389 cx.foreground_executor()
1390 .spawn(async move {
1391 match future.await {
1392 Ok(_) => {
1393 log::debug!(
1394 "{}:remote message handled. name:{type_name}",
1395 this.name
1396 );
1397 }
1398 Err(error) => {
1399 log::error!(
1400 "{}:error handling message. type:{}, error:{:#}",
1401 this.name,
1402 type_name,
1403 format!("{error:#}").lines().fold(
1404 String::new(),
1405 |mut message, line| {
1406 if !message.is_empty() {
1407 message.push(' ');
1408 }
1409 message.push_str(line);
1410 message
1411 }
1412 )
1413 );
1414 }
1415 }
1416 })
1417 .detach()
1418 } else {
1419 log::error!("{}:unhandled remote message name:{type_name}", this.name);
1420 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1421 message_id,
1422 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1423 ) {
1424 log::error!(
1425 "{}:error sending error response for {type_name}:{e:#}",
1426 this.name
1427 );
1428 }
1429 }
1430 }
1431 }
1432 anyhow::Ok(())
1433 })
1434 }
1435
1436 pub(crate) fn reconnect(
1437 self: &Arc<Self>,
1438 incoming_rx: UnboundedReceiver<Envelope>,
1439 outgoing_tx: UnboundedSender<Envelope>,
1440 cx: &AsyncApp,
1441 ) {
1442 *self.outgoing_tx.lock() = outgoing_tx;
1443 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1444 }
1445
1446 fn request<T: RequestMessage>(
1447 &self,
1448 payload: T,
1449 ) -> impl 'static + Future<Output = Result<T::Response>> {
1450 self.request_internal(payload, true)
1451 }
1452
1453 fn request_internal<T: RequestMessage>(
1454 &self,
1455 payload: T,
1456 use_buffer: bool,
1457 ) -> impl 'static + Future<Output = Result<T::Response>> {
1458 log::debug!("remote request start. name:{}", T::NAME);
1459 let response =
1460 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1461 async move {
1462 let response = response.await?;
1463 log::debug!("remote request finish. name:{}", T::NAME);
1464 T::Response::from_envelope(response).context("received a response of the wrong type")
1465 }
1466 }
1467
1468 async fn resync(&self, timeout: Duration) -> Result<()> {
1469 smol::future::or(
1470 async {
1471 self.request_internal(proto::FlushBufferedMessages {}, false)
1472 .await?;
1473
1474 for envelope in self.buffer.lock().iter() {
1475 self.outgoing_tx
1476 .lock()
1477 .unbounded_send(envelope.clone())
1478 .ok();
1479 }
1480 Ok(())
1481 },
1482 async {
1483 smol::Timer::after(timeout).await;
1484 anyhow::bail!("Timed out resyncing remote client")
1485 },
1486 )
1487 .await
1488 }
1489
1490 async fn ping(&self, timeout: Duration) -> Result<()> {
1491 smol::future::or(
1492 async {
1493 self.request(proto::Ping {}).await?;
1494 Ok(())
1495 },
1496 async {
1497 smol::Timer::after(timeout).await;
1498 anyhow::bail!("Timed out pinging remote client")
1499 },
1500 )
1501 .await
1502 }
1503
1504 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1505 log::debug!("remote send name:{}", T::NAME);
1506 self.send_dynamic(payload.into_envelope(0, None, None))
1507 }
1508
1509 fn request_dynamic(
1510 &self,
1511 mut envelope: proto::Envelope,
1512 type_name: &'static str,
1513 use_buffer: bool,
1514 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1515 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1516 let (tx, rx) = oneshot::channel();
1517 let mut response_channels_lock = self.response_channels.lock();
1518 response_channels_lock.insert(MessageId(envelope.id), tx);
1519 drop(response_channels_lock);
1520
1521 let result = if use_buffer {
1522 self.send_buffered(envelope)
1523 } else {
1524 self.send_unbuffered(envelope)
1525 };
1526 async move {
1527 if let Err(error) = &result {
1528 log::error!("failed to send message: {error}");
1529 anyhow::bail!("failed to send message: {error}");
1530 }
1531
1532 let response = rx.await.context("connection lost")?.0;
1533 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1534 return Err(RpcError::from_proto(error, type_name));
1535 }
1536 Ok(response)
1537 }
1538 }
1539
1540 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1541 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1542 self.send_buffered(envelope)
1543 }
1544
1545 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1546 envelope.ack_id = Some(self.max_received.load(SeqCst));
1547 self.buffer.lock().push_back(envelope.clone());
1548 // ignore errors on send (happen while we're reconnecting)
1549 // assume that the global "disconnected" overlay is sufficient.
1550 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1551 Ok(())
1552 }
1553
1554 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1555 envelope.ack_id = Some(self.max_received.load(SeqCst));
1556 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1557 Ok(())
1558 }
1559}
1560
1561impl ProtoClient for ChannelClient {
1562 fn request(
1563 &self,
1564 envelope: proto::Envelope,
1565 request_type: &'static str,
1566 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1567 self.request_dynamic(envelope, request_type, true).boxed()
1568 }
1569
1570 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1571 self.send_dynamic(envelope)
1572 }
1573
1574 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1575 self.send_dynamic(envelope)
1576 }
1577
1578 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1579 &self.message_handlers
1580 }
1581
1582 fn is_via_collab(&self) -> bool {
1583 false
1584 }
1585
1586 fn has_wsl_interop(&self) -> bool {
1587 self.has_wsl_interop
1588 }
1589}