1#[cfg(any(test, feature = "test-support"))]
2use crate::transport::mock::ConnectGuard;
3use crate::{
4 SshConnectionOptions,
5 protocol::MessageId,
6 proxy::ProxyLaunchError,
7 transport::{
8 docker::{DockerConnectionOptions, DockerExecConnection},
9 ssh::SshRemoteConnection,
10 wsl::{WslConnectionOptions, WslRemoteConnection},
11 },
12};
13use anyhow::{Context as _, Result, anyhow};
14use askpass::EncryptedPassword;
15use async_trait::async_trait;
16use collections::HashMap;
17use futures::{
18 Future, FutureExt as _, StreamExt as _,
19 channel::{
20 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
21 oneshot,
22 },
23 future::{BoxFuture, Shared, WeakShared},
24 select, select_biased,
25};
26use gpui::{
27 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
28 EventEmitter, FutureExt, Global, Task, WeakEntity,
29};
30use parking_lot::Mutex;
31
32use release_channel::ReleaseChannel;
33use rpc::{
34 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
35 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
36};
37use semver::Version;
38use std::{
39 collections::VecDeque,
40 fmt,
41 ops::ControlFlow,
42 path::PathBuf,
43 sync::{
44 Arc, Weak,
45 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
46 },
47 time::{Duration, Instant},
48};
49use util::{
50 ResultExt,
51 paths::{PathStyle, RemotePathBuf},
52};
53
54#[derive(Copy, Clone, Debug, PartialEq, Eq)]
55pub enum RemoteOs {
56 Linux,
57 MacOs,
58 Windows,
59}
60
61impl RemoteOs {
62 pub fn as_str(&self) -> &'static str {
63 match self {
64 RemoteOs::Linux => "linux",
65 RemoteOs::MacOs => "macos",
66 RemoteOs::Windows => "windows",
67 }
68 }
69
70 pub fn is_windows(&self) -> bool {
71 matches!(self, RemoteOs::Windows)
72 }
73}
74
75impl std::fmt::Display for RemoteOs {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.write_str(self.as_str())
78 }
79}
80
81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
82pub enum RemoteArch {
83 X86_64,
84 Aarch64,
85}
86
87impl RemoteArch {
88 pub fn as_str(&self) -> &'static str {
89 match self {
90 RemoteArch::X86_64 => "x86_64",
91 RemoteArch::Aarch64 => "aarch64",
92 }
93 }
94}
95
96impl std::fmt::Display for RemoteArch {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.write_str(self.as_str())
99 }
100}
101
102#[derive(Copy, Clone, Debug)]
103pub struct RemotePlatform {
104 pub os: RemoteOs,
105 pub arch: RemoteArch,
106}
107
108#[derive(Clone, Debug)]
109pub struct CommandTemplate {
110 pub program: String,
111 pub args: Vec<String>,
112 pub env: HashMap<String, String>,
113}
114
115/// Whether a command should be run with TTY allocation for interactive use.
116#[derive(Clone, Copy, Debug, PartialEq, Eq)]
117pub enum Interactive {
118 /// Allocate a pseudo-TTY for interactive terminal use.
119 Yes,
120 /// Do not allocate a TTY - for commands that communicate via piped stdio.
121 No,
122}
123
124pub trait RemoteClientDelegate: Send + Sync {
125 fn ask_password(
126 &self,
127 prompt: String,
128 tx: oneshot::Sender<EncryptedPassword>,
129 cx: &mut AsyncApp,
130 );
131 fn get_download_url(
132 &self,
133 platform: RemotePlatform,
134 release_channel: ReleaseChannel,
135 version: Option<Version>,
136 cx: &mut AsyncApp,
137 ) -> Task<Result<Option<String>>>;
138 fn download_server_binary_locally(
139 &self,
140 platform: RemotePlatform,
141 release_channel: ReleaseChannel,
142 version: Option<Version>,
143 cx: &mut AsyncApp,
144 ) -> Task<Result<PathBuf>>;
145 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
146}
147
148const MAX_MISSED_HEARTBEATS: usize = 5;
149const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
150const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
151const INITIAL_CONNECTION_TIMEOUT: Duration =
152 Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
153
154pub const MAX_RECONNECT_ATTEMPTS: usize = 3;
155
156enum State {
157 Connecting,
158 Connected {
159 remote_connection: Arc<dyn RemoteConnection>,
160 delegate: Arc<dyn RemoteClientDelegate>,
161
162 multiplex_task: Task<Result<()>>,
163 heartbeat_task: Task<Result<()>>,
164 },
165 HeartbeatMissed {
166 missed_heartbeats: usize,
167
168 remote_connection: Arc<dyn RemoteConnection>,
169 delegate: Arc<dyn RemoteClientDelegate>,
170
171 multiplex_task: Task<Result<()>>,
172 heartbeat_task: Task<Result<()>>,
173 },
174 Reconnecting,
175 ReconnectFailed {
176 remote_connection: Arc<dyn RemoteConnection>,
177 delegate: Arc<dyn RemoteClientDelegate>,
178
179 error: anyhow::Error,
180 attempts: usize,
181 },
182 ReconnectExhausted,
183 ServerNotRunning,
184}
185
186impl fmt::Display for State {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 match self {
189 Self::Connecting => write!(f, "connecting"),
190 Self::Connected { .. } => write!(f, "connected"),
191 Self::Reconnecting => write!(f, "reconnecting"),
192 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
193 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
194 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
195 Self::ServerNotRunning { .. } => write!(f, "server not running"),
196 }
197 }
198}
199
200impl State {
201 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
202 match self {
203 Self::Connected {
204 remote_connection, ..
205 } => Some(remote_connection.clone()),
206 Self::HeartbeatMissed {
207 remote_connection, ..
208 } => Some(remote_connection.clone()),
209 Self::ReconnectFailed {
210 remote_connection, ..
211 } => Some(remote_connection.clone()),
212 _ => None,
213 }
214 }
215
216 fn can_reconnect(&self) -> bool {
217 match self {
218 Self::Connected { .. }
219 | Self::HeartbeatMissed { .. }
220 | Self::ReconnectFailed { .. } => true,
221 State::Connecting
222 | State::Reconnecting
223 | State::ReconnectExhausted
224 | State::ServerNotRunning => false,
225 }
226 }
227
228 fn is_reconnect_failed(&self) -> bool {
229 matches!(self, Self::ReconnectFailed { .. })
230 }
231
232 fn is_reconnect_exhausted(&self) -> bool {
233 matches!(self, Self::ReconnectExhausted { .. })
234 }
235
236 fn is_server_not_running(&self) -> bool {
237 matches!(self, Self::ServerNotRunning)
238 }
239
240 fn is_reconnecting(&self) -> bool {
241 matches!(self, Self::Reconnecting { .. })
242 }
243
244 fn heartbeat_recovered(self) -> Self {
245 match self {
246 Self::HeartbeatMissed {
247 remote_connection,
248 delegate,
249 multiplex_task,
250 heartbeat_task,
251 ..
252 } => Self::Connected {
253 remote_connection,
254 delegate,
255 multiplex_task,
256 heartbeat_task,
257 },
258 _ => self,
259 }
260 }
261
262 fn heartbeat_missed(self) -> Self {
263 match self {
264 Self::Connected {
265 remote_connection,
266 delegate,
267 multiplex_task,
268 heartbeat_task,
269 } => Self::HeartbeatMissed {
270 missed_heartbeats: 1,
271 remote_connection,
272 delegate,
273 multiplex_task,
274 heartbeat_task,
275 },
276 Self::HeartbeatMissed {
277 missed_heartbeats,
278 remote_connection,
279 delegate,
280 multiplex_task,
281 heartbeat_task,
282 } => Self::HeartbeatMissed {
283 missed_heartbeats: missed_heartbeats + 1,
284 remote_connection,
285 delegate,
286 multiplex_task,
287 heartbeat_task,
288 },
289 _ => self,
290 }
291 }
292}
293
294/// The state of the ssh connection.
295#[derive(Clone, Copy, Debug, PartialEq, Eq)]
296pub enum ConnectionState {
297 Connecting,
298 Connected,
299 HeartbeatMissed,
300 Reconnecting,
301 Disconnected,
302}
303
304impl From<&State> for ConnectionState {
305 fn from(value: &State) -> Self {
306 match value {
307 State::Connecting => Self::Connecting,
308 State::Connected { .. } => Self::Connected,
309 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
310 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
311 State::ReconnectExhausted => Self::Disconnected,
312 State::ServerNotRunning => Self::Disconnected,
313 }
314 }
315}
316
317pub struct RemoteClient {
318 client: Arc<ChannelClient>,
319 unique_identifier: String,
320 connection_options: RemoteConnectionOptions,
321 path_style: PathStyle,
322 state: Option<State>,
323}
324
325#[derive(Debug)]
326pub enum RemoteClientEvent {
327 Disconnected,
328}
329
330impl EventEmitter<RemoteClientEvent> for RemoteClient {}
331
332/// Identifies the socket on the remote server so that reconnects
333/// can re-join the same project.
334pub enum ConnectionIdentifier {
335 Setup(u64),
336 Workspace(i64),
337}
338
339static NEXT_ID: AtomicU64 = AtomicU64::new(1);
340
341impl ConnectionIdentifier {
342 pub fn setup() -> Self {
343 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
344 }
345
346 // This string gets used in a socket name, and so must be relatively short.
347 // The total length of:
348 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
349 // Must be less than about 100 characters
350 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
351 // So our strings should be at most 20 characters or so.
352 fn to_string(&self, cx: &App) -> String {
353 let identifier_prefix = match ReleaseChannel::global(cx) {
354 ReleaseChannel::Stable => "".to_string(),
355 release_channel => format!("{}-", release_channel.dev_name()),
356 };
357 match self {
358 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
359 Self::Workspace(workspace_id) => {
360 format!("{identifier_prefix}workspace-{workspace_id}",)
361 }
362 }
363 }
364}
365
366pub async fn connect(
367 connection_options: RemoteConnectionOptions,
368 delegate: Arc<dyn RemoteClientDelegate>,
369 cx: &mut AsyncApp,
370) -> Result<Arc<dyn RemoteConnection>> {
371 cx.update(|cx| {
372 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
373 pool.connect(connection_options.clone(), delegate.clone(), cx)
374 })
375 })
376 .await
377 .map_err(|e| e.cloned())
378}
379
380impl RemoteClient {
381 pub fn new(
382 unique_identifier: ConnectionIdentifier,
383 remote_connection: Arc<dyn RemoteConnection>,
384 cancellation: oneshot::Receiver<()>,
385 delegate: Arc<dyn RemoteClientDelegate>,
386 cx: &mut App,
387 ) -> Task<Result<Option<Entity<Self>>>> {
388 let unique_identifier = unique_identifier.to_string(cx);
389 cx.spawn(async move |cx| {
390 let success = Box::pin(async move {
391 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
392 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
393 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
394
395 let client = cx.update(|cx| {
396 ChannelClient::new(
397 incoming_rx,
398 outgoing_tx,
399 cx,
400 "client",
401 remote_connection.has_wsl_interop(),
402 )
403 });
404
405 let path_style = remote_connection.path_style();
406 let this = cx.new(|_| Self {
407 client: client.clone(),
408 unique_identifier: unique_identifier.clone(),
409 connection_options: remote_connection.connection_options(),
410 path_style,
411 state: Some(State::Connecting),
412 });
413
414 let io_task = remote_connection.start_proxy(
415 unique_identifier,
416 false,
417 incoming_tx,
418 outgoing_rx,
419 connection_activity_tx,
420 delegate.clone(),
421 cx,
422 );
423
424 let ready = client
425 .wait_for_remote_started()
426 .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
427 .await;
428 match ready {
429 Ok(Some(_)) => {}
430 Ok(None) => {
431 let mut error = "remote client exited before becoming ready".to_owned();
432 if let Some(status) = io_task.now_or_never() {
433 match status {
434 Ok(exit_code) => {
435 error.push_str(&format!(", exit_code={exit_code:?}"))
436 }
437 Err(e) => error.push_str(&format!(", error={e:?}")),
438 }
439 }
440 let error = anyhow::anyhow!("{error}");
441 log::error!("failed to establish connection: {}", error);
442 return Err(error);
443 }
444 Err(_) => {
445 let mut error =
446 "remote client did not become ready within the timeout".to_owned();
447 if let Some(status) = io_task.now_or_never() {
448 match status {
449 Ok(exit_code) => {
450 error.push_str(&format!(", exit_code={exit_code:?}"))
451 }
452 Err(e) => error.push_str(&format!(", error={e:?}")),
453 }
454 }
455 let error = anyhow::anyhow!("{error}");
456 log::error!("failed to establish connection: {}", error);
457 return Err(error);
458 }
459 }
460 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
461 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
462 log::error!("failed to establish connection: {}", error);
463 return Err(error);
464 }
465
466 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
467
468 this.update(cx, |this, _| {
469 this.state = Some(State::Connected {
470 remote_connection,
471 delegate,
472 multiplex_task,
473 heartbeat_task,
474 });
475 });
476
477 Ok(Some(this))
478 });
479
480 select! {
481 _ = cancellation.fuse() => {
482 Ok(None)
483 }
484 result = success.fuse() => result
485 }
486 })
487 }
488
489 pub fn proto_client_from_channels(
490 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
491 outgoing_tx: mpsc::UnboundedSender<Envelope>,
492 cx: &App,
493 name: &'static str,
494 has_wsl_interop: bool,
495 ) -> AnyProtoClient {
496 ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
497 }
498
499 pub fn shutdown_processes<T: RequestMessage>(
500 &mut self,
501 shutdown_request: Option<T>,
502 executor: BackgroundExecutor,
503 ) -> Option<impl Future<Output = ()> + use<T>> {
504 let state = self.state.take()?;
505 log::info!("shutting down remote processes");
506
507 let State::Connected {
508 multiplex_task,
509 heartbeat_task,
510 remote_connection,
511 delegate,
512 } = state
513 else {
514 return None;
515 };
516
517 let client = self.client.clone();
518
519 Some(async move {
520 if let Some(shutdown_request) = shutdown_request {
521 client.send(shutdown_request).log_err();
522 // We wait 50ms instead of waiting for a response, because
523 // waiting for a response would require us to wait on the main thread
524 // which we want to avoid in an `on_app_quit` callback.
525 executor.timer(Duration::from_millis(50)).await;
526 }
527
528 // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
529 // child of master_process.
530 drop(multiplex_task);
531 // Now drop the rest of state, which kills master process.
532 drop(heartbeat_task);
533 drop(remote_connection);
534 drop(delegate);
535 })
536 }
537
538 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
539 let can_reconnect = self
540 .state
541 .as_ref()
542 .map(|state| state.can_reconnect())
543 .unwrap_or(false);
544 if !can_reconnect {
545 let state = if let Some(state) = self.state.as_ref() {
546 state.to_string()
547 } else {
548 "no state set".to_string()
549 };
550 log::info!(
551 "aborting reconnect, because not in state that allows reconnecting: {state}"
552 );
553 anyhow::bail!(
554 "aborting reconnect, because not in state that allows reconnecting: {state}"
555 );
556 }
557
558 let state = self.state.take().unwrap();
559 let (attempts, remote_connection, delegate) = match state {
560 State::Connected {
561 remote_connection,
562 delegate,
563 multiplex_task,
564 heartbeat_task,
565 }
566 | State::HeartbeatMissed {
567 remote_connection,
568 delegate,
569 multiplex_task,
570 heartbeat_task,
571 ..
572 } => {
573 drop(multiplex_task);
574 drop(heartbeat_task);
575 (0, remote_connection, delegate)
576 }
577 State::ReconnectFailed {
578 attempts,
579 remote_connection,
580 delegate,
581 ..
582 } => (attempts, remote_connection, delegate),
583 State::Connecting
584 | State::Reconnecting
585 | State::ReconnectExhausted
586 | State::ServerNotRunning => unreachable!(),
587 };
588
589 let attempts = attempts + 1;
590 if attempts > MAX_RECONNECT_ATTEMPTS {
591 log::error!(
592 "Failed to reconnect to after {} attempts, giving up",
593 MAX_RECONNECT_ATTEMPTS
594 );
595 self.set_state(State::ReconnectExhausted, cx);
596 return Ok(());
597 }
598
599 self.set_state(State::Reconnecting, cx);
600
601 log::info!(
602 "Trying to reconnect to remote server... Attempt {}",
603 attempts
604 );
605
606 let unique_identifier = self.unique_identifier.clone();
607 let client = self.client.clone();
608 let reconnect_task = cx.spawn(async move |this, cx| {
609 macro_rules! failed {
610 ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
611 delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
612 return State::ReconnectFailed {
613 error: anyhow!($error),
614 attempts: $attempts,
615 remote_connection: $remote_connection,
616 delegate: $delegate,
617 };
618 };
619 }
620
621 if let Err(error) = remote_connection
622 .kill()
623 .await
624 .context("Failed to kill remote_connection process")
625 {
626 failed!(error, attempts, remote_connection, delegate);
627 };
628
629 let connection_options = remote_connection.connection_options();
630
631 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
632 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
633 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
634
635 let (remote_connection, io_task) = match async {
636 let remote_connection = cx
637 .update_global(|pool: &mut ConnectionPool, cx| {
638 pool.connect(connection_options, delegate.clone(), cx)
639 })
640 .await
641 .map_err(|error| error.cloned())?;
642
643 let io_task = remote_connection.start_proxy(
644 unique_identifier,
645 true,
646 incoming_tx,
647 outgoing_rx,
648 connection_activity_tx,
649 delegate.clone(),
650 cx,
651 );
652 anyhow::Ok((remote_connection, io_task))
653 }
654 .await
655 {
656 Ok((remote_connection, io_task)) => (remote_connection, io_task),
657 Err(error) => {
658 failed!(error, attempts, remote_connection, delegate);
659 }
660 };
661
662 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
663 client.reconnect(incoming_rx, outgoing_tx, cx);
664
665 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
666 failed!(error, attempts, remote_connection, delegate);
667 };
668
669 State::Connected {
670 remote_connection,
671 delegate,
672 multiplex_task,
673 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
674 }
675 });
676
677 cx.spawn(async move |this, cx| {
678 let new_state = reconnect_task.await;
679 this.update(cx, |this, cx| {
680 this.try_set_state(cx, |old_state| {
681 if old_state.is_reconnecting() {
682 match &new_state {
683 State::Connecting
684 | State::Reconnecting
685 | State::HeartbeatMissed { .. }
686 | State::ServerNotRunning => {}
687 State::Connected { .. } => {
688 log::info!("Successfully reconnected");
689 }
690 State::ReconnectFailed {
691 error, attempts, ..
692 } => {
693 log::error!(
694 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
695 attempts,
696 error
697 );
698 }
699 State::ReconnectExhausted => {
700 log::error!("Reconnect attempt failed and all attempts exhausted");
701 }
702 }
703 Some(new_state)
704 } else {
705 None
706 }
707 });
708
709 if this.state_is(State::is_reconnect_failed) {
710 this.reconnect(cx)
711 } else if this.state_is(State::is_reconnect_exhausted) {
712 Ok(())
713 } else {
714 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
715 Ok(())
716 }
717 })
718 })
719 .detach_and_log_err(cx);
720
721 Ok(())
722 }
723
724 fn heartbeat(
725 this: WeakEntity<Self>,
726 mut connection_activity_rx: mpsc::Receiver<()>,
727 cx: &mut AsyncApp,
728 ) -> Task<Result<()>> {
729 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
730 return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
731 };
732
733 cx.spawn(async move |cx| {
734 let mut missed_heartbeats = 0;
735
736 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
737 futures::pin_mut!(keepalive_timer);
738
739 loop {
740 select_biased! {
741 result = connection_activity_rx.next().fuse() => {
742 if result.is_none() {
743 log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
744 return Ok(());
745 }
746
747 if missed_heartbeats != 0 {
748 missed_heartbeats = 0;
749 let _ =this.update(cx, |this, cx| {
750 this.handle_heartbeat_result(missed_heartbeats, cx)
751 })?;
752 }
753 }
754 _ = keepalive_timer => {
755 log::debug!("Sending heartbeat to server...");
756
757 let result = select_biased! {
758 _ = connection_activity_rx.next().fuse() => {
759 Ok(())
760 }
761 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
762 ping_result
763 }
764 };
765
766 if result.is_err() {
767 missed_heartbeats += 1;
768 log::warn!(
769 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
770 HEARTBEAT_TIMEOUT,
771 missed_heartbeats,
772 MAX_MISSED_HEARTBEATS
773 );
774 } else if missed_heartbeats != 0 {
775 missed_heartbeats = 0;
776 } else {
777 continue;
778 }
779
780 let result = this.update(cx, |this, cx| {
781 this.handle_heartbeat_result(missed_heartbeats, cx)
782 })?;
783 if result.is_break() {
784 return Ok(());
785 }
786 }
787 }
788
789 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
790 }
791 })
792 }
793
794 fn handle_heartbeat_result(
795 &mut self,
796 missed_heartbeats: usize,
797 cx: &mut Context<Self>,
798 ) -> ControlFlow<()> {
799 let state = self.state.take().unwrap();
800 let next_state = if missed_heartbeats > 0 {
801 state.heartbeat_missed()
802 } else {
803 state.heartbeat_recovered()
804 };
805
806 self.set_state(next_state, cx);
807
808 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
809 log::error!(
810 "Missed last {} heartbeats. Reconnecting...",
811 missed_heartbeats
812 );
813
814 self.reconnect(cx)
815 .context("failed to start reconnect process after missing heartbeats")
816 .log_err();
817 ControlFlow::Break(())
818 } else {
819 ControlFlow::Continue(())
820 }
821 }
822
823 fn monitor(
824 this: WeakEntity<Self>,
825 io_task: Task<Result<i32>>,
826 cx: &AsyncApp,
827 ) -> Task<Result<()>> {
828 cx.spawn(async move |cx| {
829 let result = io_task.await;
830
831 match result {
832 Ok(exit_code) => {
833 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
834 match error {
835 ProxyLaunchError::ServerNotRunning => {
836 log::error!("failed to reconnect because server is not running");
837 this.update(cx, |this, cx| {
838 this.set_state(State::ServerNotRunning, cx);
839 })?;
840 }
841 }
842 } else if exit_code > 0 {
843 log::error!("proxy process terminated unexpectedly");
844 this.update(cx, |this, cx| {
845 this.reconnect(cx).ok();
846 })?;
847 }
848 }
849 Err(error) => {
850 log::warn!(
851 "remote io task died with error: {:?}. reconnecting...",
852 error
853 );
854 this.update(cx, |this, cx| {
855 this.reconnect(cx).ok();
856 })?;
857 }
858 }
859
860 Ok(())
861 })
862 }
863
864 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
865 self.state.as_ref().is_some_and(check)
866 }
867
868 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
869 let new_state = self.state.as_ref().and_then(map);
870 if let Some(new_state) = new_state {
871 self.state.replace(new_state);
872 cx.notify();
873 }
874 }
875
876 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
877 log::info!("setting state to '{}'", &state);
878
879 let is_reconnect_exhausted = state.is_reconnect_exhausted();
880 let is_server_not_running = state.is_server_not_running();
881 self.state.replace(state);
882
883 if is_reconnect_exhausted || is_server_not_running {
884 cx.emit(RemoteClientEvent::Disconnected);
885 }
886 cx.notify();
887 }
888
889 pub fn shell(&self) -> Option<String> {
890 Some(self.remote_connection()?.shell())
891 }
892
893 pub fn default_system_shell(&self) -> Option<String> {
894 Some(self.remote_connection()?.default_system_shell())
895 }
896
897 pub fn shares_network_interface(&self) -> bool {
898 self.remote_connection()
899 .map_or(false, |connection| connection.shares_network_interface())
900 }
901
902 pub fn build_command(
903 &self,
904 program: Option<String>,
905 args: &[String],
906 env: &HashMap<String, String>,
907 working_dir: Option<String>,
908 port_forward: Option<(u16, String, u16)>,
909 ) -> Result<CommandTemplate> {
910 self.build_command_with_options(
911 program,
912 args,
913 env,
914 working_dir,
915 port_forward,
916 Interactive::Yes,
917 )
918 }
919
920 pub fn build_command_with_options(
921 &self,
922 program: Option<String>,
923 args: &[String],
924 env: &HashMap<String, String>,
925 working_dir: Option<String>,
926 port_forward: Option<(u16, String, u16)>,
927 interactive: Interactive,
928 ) -> Result<CommandTemplate> {
929 let Some(connection) = self.remote_connection() else {
930 return Err(anyhow!("no remote connection"));
931 };
932 connection.build_command(program, args, env, working_dir, port_forward, interactive)
933 }
934
935 pub fn build_forward_ports_command(
936 &self,
937 forwards: Vec<(u16, String, u16)>,
938 ) -> Result<CommandTemplate> {
939 let Some(connection) = self.remote_connection() else {
940 return Err(anyhow!("no remote connection"));
941 };
942 connection.build_forward_ports_command(forwards)
943 }
944
945 pub fn upload_directory(
946 &self,
947 src_path: PathBuf,
948 dest_path: RemotePathBuf,
949 cx: &App,
950 ) -> Task<Result<()>> {
951 let Some(connection) = self.remote_connection() else {
952 return Task::ready(Err(anyhow!("no remote connection")));
953 };
954 connection.upload_directory(src_path, dest_path, cx)
955 }
956
957 pub fn proto_client(&self) -> AnyProtoClient {
958 self.client.clone().into()
959 }
960
961 pub fn connection_options(&self) -> RemoteConnectionOptions {
962 self.connection_options.clone()
963 }
964
965 pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
966 if let State::Connected {
967 remote_connection, ..
968 } = self.state.as_ref()?
969 {
970 Some(remote_connection.clone())
971 } else {
972 None
973 }
974 }
975
976 pub fn connection_state(&self) -> ConnectionState {
977 self.state
978 .as_ref()
979 .map(ConnectionState::from)
980 .unwrap_or(ConnectionState::Disconnected)
981 }
982
983 pub fn is_disconnected(&self) -> bool {
984 self.connection_state() == ConnectionState::Disconnected
985 }
986
987 pub fn path_style(&self) -> PathStyle {
988 self.path_style
989 }
990
991 /// Forcibly disconnects from the remote server by killing the underlying connection.
992 /// This will trigger the reconnection logic if reconnection attempts remain.
993 /// Useful for testing reconnection behavior in real environments.
994 pub fn force_disconnect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
995 let Some(connection) = self.remote_connection() else {
996 return Task::ready(Err(anyhow!("no active remote connection to disconnect")));
997 };
998
999 log::info!("force_disconnect: killing remote connection");
1000
1001 cx.spawn(async move |_, _| {
1002 connection.kill().await?;
1003 Ok(())
1004 })
1005 }
1006
1007 /// Simulates a timeout by pausing heartbeat responses.
1008 /// This will cause heartbeat failures and eventually trigger reconnection
1009 /// after MAX_MISSED_HEARTBEATS are missed.
1010 /// Useful for testing timeout behavior in real environments.
1011 pub fn force_heartbeat_timeout(&mut self, attempts: usize, cx: &mut Context<Self>) {
1012 log::info!("force_heartbeat_timeout: triggering heartbeat failure state");
1013
1014 if let Some(State::Connected {
1015 remote_connection,
1016 delegate,
1017 multiplex_task,
1018 heartbeat_task,
1019 }) = self.state.take()
1020 {
1021 self.set_state(
1022 if attempts == 0 {
1023 State::HeartbeatMissed {
1024 missed_heartbeats: MAX_MISSED_HEARTBEATS,
1025 remote_connection,
1026 delegate,
1027 multiplex_task,
1028 heartbeat_task,
1029 }
1030 } else {
1031 State::ReconnectFailed {
1032 remote_connection,
1033 delegate,
1034 error: anyhow!("forced heartbeat timeout"),
1035 attempts,
1036 }
1037 },
1038 cx,
1039 );
1040
1041 self.reconnect(cx)
1042 .context("failed to start reconnect after forced timeout")
1043 .log_err();
1044 } else {
1045 log::warn!("force_heartbeat_timeout: not in Connected state, ignoring");
1046 }
1047 }
1048
1049 #[cfg(any(test, feature = "test-support"))]
1050 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
1051 let opts = self.connection_options();
1052 client_cx.spawn(async move |cx| {
1053 let connection = cx.update_global(|c: &mut ConnectionPool, _| {
1054 if let Some(ConnectionPoolEntry::Connected(c)) = c.connections.get(&opts) {
1055 if let Some(connection) = c.upgrade() {
1056 connection
1057 } else {
1058 panic!("connection was dropped")
1059 }
1060 } else {
1061 panic!("missing test connection")
1062 }
1063 });
1064
1065 connection.simulate_disconnect(cx);
1066 })
1067 }
1068
1069 /// Creates a mock connection pair for testing.
1070 ///
1071 /// This is the recommended way to create mock remote connections for tests.
1072 /// It returns the `MockConnectionOptions` (which can be passed to create a
1073 /// `HeadlessProject`), an `AnyProtoClient` for the server side and a
1074 /// `ConnectGuard` for the client side which blocks the connection from
1075 /// being established until dropped.
1076 ///
1077 /// # Example
1078 /// ```ignore
1079 /// let (opts, server_session, connect_guard) = RemoteClient::fake_server(cx, server_cx);
1080 /// // Set up HeadlessProject with server_session...
1081 /// drop(connect_guard);
1082 /// let client = RemoteClient::fake_client(opts, cx).await;
1083 /// ```
1084 #[cfg(any(test, feature = "test-support"))]
1085 pub fn fake_server(
1086 client_cx: &mut gpui::TestAppContext,
1087 server_cx: &mut gpui::TestAppContext,
1088 ) -> (RemoteConnectionOptions, AnyProtoClient, ConnectGuard) {
1089 use crate::transport::mock::MockConnection;
1090 let (opts, server_client, connect_guard) = MockConnection::new(client_cx, server_cx);
1091 (opts.into(), server_client, connect_guard)
1092 }
1093
1094 /// Creates a `RemoteClient` connected to a mock server.
1095 ///
1096 /// Call `fake_server` first to get the connection options, set up the
1097 /// `HeadlessProject` with the server session, then call this method
1098 /// to create the client.
1099 #[cfg(any(test, feature = "test-support"))]
1100 pub async fn connect_mock(
1101 opts: RemoteConnectionOptions,
1102 client_cx: &mut gpui::TestAppContext,
1103 ) -> Entity<Self> {
1104 assert!(matches!(opts, RemoteConnectionOptions::Mock(..)));
1105 use crate::transport::mock::MockDelegate;
1106 let (_tx, rx) = oneshot::channel();
1107 let mut cx = client_cx.to_async();
1108 let connection = connect(opts, Arc::new(MockDelegate), &mut cx)
1109 .await
1110 .unwrap();
1111 client_cx
1112 .update(|cx| {
1113 Self::new(
1114 ConnectionIdentifier::setup(),
1115 connection,
1116 rx,
1117 Arc::new(MockDelegate),
1118 cx,
1119 )
1120 })
1121 .await
1122 .unwrap()
1123 .unwrap()
1124 }
1125
1126 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1127 self.state
1128 .as_ref()
1129 .and_then(|state| state.remote_connection())
1130 }
1131}
1132
1133enum ConnectionPoolEntry {
1134 Connecting(WeakShared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1135 Connected(Weak<dyn RemoteConnection>),
1136}
1137
1138#[derive(Default)]
1139struct ConnectionPool {
1140 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1141}
1142
1143impl Global for ConnectionPool {}
1144
1145impl ConnectionPool {
1146 fn connect(
1147 &mut self,
1148 opts: RemoteConnectionOptions,
1149 delegate: Arc<dyn RemoteClientDelegate>,
1150 cx: &mut App,
1151 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1152 let connection = self.connections.get(&opts);
1153 match connection {
1154 Some(ConnectionPoolEntry::Connecting(task)) => {
1155 if let Some(task) = task.upgrade() {
1156 log::debug!("Connecting task is still alive");
1157 cx.spawn(async move |cx| {
1158 delegate.set_status(Some("Waiting for existing connection attempt"), cx)
1159 })
1160 .detach();
1161 return task;
1162 }
1163 log::debug!("Connecting task is dead, removing it and restarting a connection");
1164 self.connections.remove(&opts);
1165 }
1166 Some(ConnectionPoolEntry::Connected(remote)) => {
1167 if let Some(remote) = remote.upgrade()
1168 && !remote.has_been_killed()
1169 {
1170 log::debug!("Connection is still alive");
1171 return Task::ready(Ok(remote)).shared();
1172 }
1173 log::debug!("Connection is dead, removing it and restarting a connection");
1174 self.connections.remove(&opts);
1175 }
1176 None => {
1177 log::debug!("No existing connection found, starting a new one");
1178 }
1179 }
1180
1181 let task = cx
1182 .spawn({
1183 let opts = opts.clone();
1184 let delegate = delegate.clone();
1185 async move |cx| {
1186 let connection = match opts.clone() {
1187 RemoteConnectionOptions::Ssh(opts) => {
1188 SshRemoteConnection::new(opts, delegate, cx)
1189 .await
1190 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1191 }
1192 RemoteConnectionOptions::Wsl(opts) => {
1193 WslRemoteConnection::new(opts, delegate, cx)
1194 .await
1195 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1196 }
1197 RemoteConnectionOptions::Docker(opts) => {
1198 DockerExecConnection::new(opts, delegate, cx)
1199 .await
1200 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1201 }
1202 #[cfg(any(test, feature = "test-support"))]
1203 RemoteConnectionOptions::Mock(opts) => match cx.update(|cx| {
1204 cx.default_global::<crate::transport::mock::MockConnectionRegistry>()
1205 .take(&opts)
1206 }) {
1207 Some(connection) => Ok(connection.await as Arc<dyn RemoteConnection>),
1208 None => Err(anyhow!(
1209 "Mock connection not found. Call MockConnection::new() first."
1210 )),
1211 },
1212 };
1213
1214 cx.update_global(|pool: &mut Self, _| {
1215 debug_assert!(matches!(
1216 pool.connections.get(&opts),
1217 Some(ConnectionPoolEntry::Connecting(_))
1218 ));
1219 match connection {
1220 Ok(connection) => {
1221 pool.connections.insert(
1222 opts.clone(),
1223 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1224 );
1225 Ok(connection)
1226 }
1227 Err(error) => {
1228 pool.connections.remove(&opts);
1229 Err(Arc::new(error))
1230 }
1231 }
1232 })
1233 }
1234 })
1235 .shared();
1236 if let Some(task) = task.downgrade() {
1237 self.connections
1238 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task));
1239 }
1240 task
1241 }
1242}
1243
1244#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1245pub enum RemoteConnectionOptions {
1246 Ssh(SshConnectionOptions),
1247 Wsl(WslConnectionOptions),
1248 Docker(DockerConnectionOptions),
1249 #[cfg(any(test, feature = "test-support"))]
1250 Mock(crate::transport::mock::MockConnectionOptions),
1251}
1252
1253impl RemoteConnectionOptions {
1254 pub fn display_name(&self) -> String {
1255 match self {
1256 RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1257 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1258 RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1259 #[cfg(any(test, feature = "test-support"))]
1260 RemoteConnectionOptions::Mock(opts) => format!("mock-{}", opts.id),
1261 }
1262 }
1263}
1264
1265impl From<SshConnectionOptions> for RemoteConnectionOptions {
1266 fn from(opts: SshConnectionOptions) -> Self {
1267 RemoteConnectionOptions::Ssh(opts)
1268 }
1269}
1270
1271impl From<WslConnectionOptions> for RemoteConnectionOptions {
1272 fn from(opts: WslConnectionOptions) -> Self {
1273 RemoteConnectionOptions::Wsl(opts)
1274 }
1275}
1276
1277#[cfg(any(test, feature = "test-support"))]
1278impl From<crate::transport::mock::MockConnectionOptions> for RemoteConnectionOptions {
1279 fn from(opts: crate::transport::mock::MockConnectionOptions) -> Self {
1280 RemoteConnectionOptions::Mock(opts)
1281 }
1282}
1283
1284#[cfg(target_os = "windows")]
1285/// Open a wsl path (\\wsl.localhost\<distro>\path)
1286#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1287#[action(namespace = workspace, no_json, no_register)]
1288pub struct OpenWslPath {
1289 pub distro: WslConnectionOptions,
1290 pub paths: Vec<PathBuf>,
1291}
1292
1293#[async_trait(?Send)]
1294pub trait RemoteConnection: Send + Sync {
1295 fn start_proxy(
1296 &self,
1297 unique_identifier: String,
1298 reconnect: bool,
1299 incoming_tx: UnboundedSender<Envelope>,
1300 outgoing_rx: UnboundedReceiver<Envelope>,
1301 connection_activity_tx: Sender<()>,
1302 delegate: Arc<dyn RemoteClientDelegate>,
1303 cx: &mut AsyncApp,
1304 ) -> Task<Result<i32>>;
1305 fn upload_directory(
1306 &self,
1307 src_path: PathBuf,
1308 dest_path: RemotePathBuf,
1309 cx: &App,
1310 ) -> Task<Result<()>>;
1311 async fn kill(&self) -> Result<()>;
1312 fn has_been_killed(&self) -> bool;
1313 fn shares_network_interface(&self) -> bool {
1314 false
1315 }
1316 fn build_command(
1317 &self,
1318 program: Option<String>,
1319 args: &[String],
1320 env: &HashMap<String, String>,
1321 working_dir: Option<String>,
1322 port_forward: Option<(u16, String, u16)>,
1323 interactive: Interactive,
1324 ) -> Result<CommandTemplate>;
1325 fn build_forward_ports_command(
1326 &self,
1327 forwards: Vec<(u16, String, u16)>,
1328 ) -> Result<CommandTemplate>;
1329 fn connection_options(&self) -> RemoteConnectionOptions;
1330 fn path_style(&self) -> PathStyle;
1331 fn shell(&self) -> String;
1332 fn default_system_shell(&self) -> String;
1333 fn has_wsl_interop(&self) -> bool;
1334
1335 #[cfg(any(test, feature = "test-support"))]
1336 fn simulate_disconnect(&self, _: &AsyncApp) {}
1337}
1338
1339type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1340
1341struct Signal<T> {
1342 tx: Mutex<Option<oneshot::Sender<T>>>,
1343 rx: Shared<Task<Option<T>>>,
1344}
1345
1346impl<T: Send + Clone + 'static> Signal<T> {
1347 pub fn new(cx: &App) -> Self {
1348 let (tx, rx) = oneshot::channel();
1349
1350 let task = cx
1351 .background_executor()
1352 .spawn(async move { rx.await.ok() })
1353 .shared();
1354
1355 Self {
1356 tx: Mutex::new(Some(tx)),
1357 rx: task,
1358 }
1359 }
1360
1361 fn set(&self, value: T) {
1362 if let Some(tx) = self.tx.lock().take() {
1363 let _ = tx.send(value);
1364 }
1365 }
1366
1367 fn wait(&self) -> Shared<Task<Option<T>>> {
1368 self.rx.clone()
1369 }
1370}
1371
1372pub(crate) struct ChannelClient {
1373 next_message_id: AtomicU32,
1374 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1375 buffer: Mutex<VecDeque<Envelope>>,
1376 response_channels: ResponseChannels,
1377 message_handlers: Mutex<ProtoMessageHandlerSet>,
1378 max_received: AtomicU32,
1379 name: &'static str,
1380 task: Mutex<Task<Result<()>>>,
1381 remote_started: Signal<()>,
1382 has_wsl_interop: bool,
1383 executor: BackgroundExecutor,
1384}
1385
1386impl ChannelClient {
1387 pub(crate) fn new(
1388 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1389 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1390 cx: &App,
1391 name: &'static str,
1392 has_wsl_interop: bool,
1393 ) -> Arc<Self> {
1394 Arc::new_cyclic(|this| Self {
1395 outgoing_tx: Mutex::new(outgoing_tx),
1396 next_message_id: AtomicU32::new(0),
1397 max_received: AtomicU32::new(0),
1398 response_channels: ResponseChannels::default(),
1399 message_handlers: Default::default(),
1400 buffer: Mutex::new(VecDeque::new()),
1401 name,
1402 executor: cx.background_executor().clone(),
1403 task: Mutex::new(Self::start_handling_messages(
1404 this.clone(),
1405 incoming_rx,
1406 &cx.to_async(),
1407 )),
1408 remote_started: Signal::new(cx),
1409 has_wsl_interop,
1410 })
1411 }
1412
1413 fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1414 self.remote_started.wait()
1415 }
1416
1417 fn start_handling_messages(
1418 this: Weak<Self>,
1419 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1420 cx: &AsyncApp,
1421 ) -> Task<Result<()>> {
1422 cx.spawn(async move |cx| {
1423 if let Some(this) = this.upgrade() {
1424 let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1425 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1426 };
1427
1428 let peer_id = PeerId { owner_id: 0, id: 0 };
1429 while let Some(incoming) = incoming_rx.next().await {
1430 let Some(this) = this.upgrade() else {
1431 return anyhow::Ok(());
1432 };
1433 if let Some(ack_id) = incoming.ack_id {
1434 let mut buffer = this.buffer.lock();
1435 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1436 buffer.pop_front();
1437 }
1438 }
1439 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1440 {
1441 log::debug!(
1442 "{}:remote message received. name:FlushBufferedMessages",
1443 this.name
1444 );
1445 {
1446 let buffer = this.buffer.lock();
1447 for envelope in buffer.iter() {
1448 this.outgoing_tx
1449 .lock()
1450 .unbounded_send(envelope.clone())
1451 .ok();
1452 }
1453 }
1454 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1455 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1456 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1457 continue;
1458 }
1459
1460 if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1461 this.remote_started.set(());
1462 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1463 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1464 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1465 continue;
1466 }
1467
1468 this.max_received.store(incoming.id, SeqCst);
1469
1470 if let Some(request_id) = incoming.responding_to {
1471 let request_id = MessageId(request_id);
1472 let sender = this.response_channels.lock().remove(&request_id);
1473 if let Some(sender) = sender {
1474 let (tx, rx) = oneshot::channel();
1475 if incoming.payload.is_some() {
1476 sender.send((incoming, tx)).ok();
1477 }
1478 rx.await.ok();
1479 }
1480 } else if let Some(envelope) =
1481 build_typed_envelope(peer_id, Instant::now(), incoming)
1482 {
1483 let type_name = envelope.payload_type_name();
1484 let message_id = envelope.message_id();
1485 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1486 &this.message_handlers,
1487 envelope,
1488 this.clone().into(),
1489 cx.clone(),
1490 ) {
1491 log::debug!("{}:remote message received. name:{type_name}", this.name);
1492 cx.foreground_executor()
1493 .spawn(async move {
1494 match future.await {
1495 Ok(_) => {
1496 log::debug!(
1497 "{}:remote message handled. name:{type_name}",
1498 this.name
1499 );
1500 }
1501 Err(error) => {
1502 log::error!(
1503 "{}:error handling message. type:{}, error:{:#}",
1504 this.name,
1505 type_name,
1506 format!("{error:#}").lines().fold(
1507 String::new(),
1508 |mut message, line| {
1509 if !message.is_empty() {
1510 message.push(' ');
1511 }
1512 message.push_str(line);
1513 message
1514 }
1515 )
1516 );
1517 }
1518 }
1519 })
1520 .detach()
1521 } else {
1522 log::error!("{}:unhandled remote message name:{type_name}", this.name);
1523 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1524 message_id,
1525 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1526 ) {
1527 log::error!(
1528 "{}:error sending error response for {type_name}:{e:#}",
1529 this.name
1530 );
1531 }
1532 }
1533 }
1534 }
1535 anyhow::Ok(())
1536 })
1537 }
1538
1539 pub(crate) fn reconnect(
1540 self: &Arc<Self>,
1541 incoming_rx: UnboundedReceiver<Envelope>,
1542 outgoing_tx: UnboundedSender<Envelope>,
1543 cx: &AsyncApp,
1544 ) {
1545 *self.outgoing_tx.lock() = outgoing_tx;
1546 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1547 }
1548
1549 fn request<T: RequestMessage>(
1550 &self,
1551 payload: T,
1552 ) -> impl 'static + Future<Output = Result<T::Response>> {
1553 self.request_internal(payload, true)
1554 }
1555
1556 fn request_internal<T: RequestMessage>(
1557 &self,
1558 payload: T,
1559 use_buffer: bool,
1560 ) -> impl 'static + Future<Output = Result<T::Response>> {
1561 log::debug!("remote request start. name:{}", T::NAME);
1562 let response =
1563 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1564 async move {
1565 let response = response.await?;
1566 log::debug!("remote request finish. name:{}", T::NAME);
1567 T::Response::from_envelope(response).context("received a response of the wrong type")
1568 }
1569 }
1570
1571 async fn resync(&self, timeout: Duration) -> Result<()> {
1572 smol::future::or(
1573 async {
1574 self.request_internal(proto::FlushBufferedMessages {}, false)
1575 .await?;
1576
1577 for envelope in self.buffer.lock().iter() {
1578 self.outgoing_tx
1579 .lock()
1580 .unbounded_send(envelope.clone())
1581 .ok();
1582 }
1583 Ok(())
1584 },
1585 async {
1586 self.executor.timer(timeout).await;
1587 anyhow::bail!("Timed out resyncing remote client")
1588 },
1589 )
1590 .await
1591 }
1592
1593 async fn ping(&self, timeout: Duration) -> Result<()> {
1594 smol::future::or(
1595 async {
1596 self.request(proto::Ping {}).await?;
1597 Ok(())
1598 },
1599 async {
1600 self.executor.timer(timeout).await;
1601 anyhow::bail!("Timed out pinging remote client")
1602 },
1603 )
1604 .await
1605 }
1606
1607 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1608 log::debug!("remote send name:{}", T::NAME);
1609 self.send_dynamic(payload.into_envelope(0, None, None))
1610 }
1611
1612 fn request_dynamic(
1613 &self,
1614 mut envelope: proto::Envelope,
1615 type_name: &'static str,
1616 use_buffer: bool,
1617 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1618 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1619 let (tx, rx) = oneshot::channel();
1620 let mut response_channels_lock = self.response_channels.lock();
1621 response_channels_lock.insert(MessageId(envelope.id), tx);
1622 drop(response_channels_lock);
1623
1624 let result = if use_buffer {
1625 self.send_buffered(envelope)
1626 } else {
1627 self.send_unbuffered(envelope)
1628 };
1629 async move {
1630 if let Err(error) = &result {
1631 log::error!("failed to send message: {error}");
1632 anyhow::bail!("failed to send message: {error}");
1633 }
1634
1635 let response = rx.await.context("connection lost")?.0;
1636 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1637 return Err(RpcError::from_proto(error, type_name));
1638 }
1639 Ok(response)
1640 }
1641 }
1642
1643 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1644 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1645 self.send_buffered(envelope)
1646 }
1647
1648 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1649 envelope.ack_id = Some(self.max_received.load(SeqCst));
1650 self.buffer.lock().push_back(envelope.clone());
1651 // ignore errors on send (happen while we're reconnecting)
1652 // assume that the global "disconnected" overlay is sufficient.
1653 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1654 Ok(())
1655 }
1656
1657 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1658 envelope.ack_id = Some(self.max_received.load(SeqCst));
1659 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1660 Ok(())
1661 }
1662}
1663
1664impl ProtoClient for ChannelClient {
1665 fn request(
1666 &self,
1667 envelope: proto::Envelope,
1668 request_type: &'static str,
1669 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1670 self.request_dynamic(envelope, request_type, true).boxed()
1671 }
1672
1673 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1674 self.send_dynamic(envelope)
1675 }
1676
1677 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1678 self.send_dynamic(envelope)
1679 }
1680
1681 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1682 &self.message_handlers
1683 }
1684
1685 fn is_via_collab(&self) -> bool {
1686 false
1687 }
1688
1689 fn has_wsl_interop(&self) -> bool {
1690 self.has_wsl_interop
1691 }
1692}