1#[cfg(any(test, feature = "test-support"))]
2use crate::transport::mock::ConnectGuard;
3use crate::{
4 SshConnectionOptions,
5 protocol::MessageId,
6 proxy::ProxyLaunchError,
7 transport::{
8 docker::{DockerConnectionOptions, DockerExecConnection},
9 ssh::SshRemoteConnection,
10 wsl::{WslConnectionOptions, WslRemoteConnection},
11 },
12};
13use anyhow::{Context as _, Result, anyhow};
14use askpass::EncryptedPassword;
15use async_trait::async_trait;
16use collections::HashMap;
17use futures::{
18 Future, FutureExt as _, StreamExt as _,
19 channel::{
20 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
21 oneshot,
22 },
23 future::{BoxFuture, Shared, WeakShared},
24 select, select_biased,
25};
26use gpui::{
27 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
28 EventEmitter, FutureExt, Global, Task, WeakEntity,
29};
30use parking_lot::Mutex;
31
32use release_channel::ReleaseChannel;
33use rpc::{
34 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
35 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
36};
37use semver::Version;
38use std::{
39 collections::VecDeque,
40 fmt,
41 ops::ControlFlow,
42 path::PathBuf,
43 sync::{
44 Arc, Weak,
45 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
46 },
47 time::{Duration, Instant},
48};
49use util::{
50 ResultExt,
51 paths::{PathStyle, RemotePathBuf},
52};
53
54#[derive(Copy, Clone, Debug, PartialEq, Eq)]
55pub enum RemoteOs {
56 Linux,
57 MacOs,
58 Windows,
59}
60
61impl RemoteOs {
62 pub fn as_str(&self) -> &'static str {
63 match self {
64 RemoteOs::Linux => "linux",
65 RemoteOs::MacOs => "macos",
66 RemoteOs::Windows => "windows",
67 }
68 }
69
70 pub fn is_windows(&self) -> bool {
71 matches!(self, RemoteOs::Windows)
72 }
73}
74
75impl std::fmt::Display for RemoteOs {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.write_str(self.as_str())
78 }
79}
80
81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
82pub enum RemoteArch {
83 X86_64,
84 Aarch64,
85}
86
87impl RemoteArch {
88 pub fn as_str(&self) -> &'static str {
89 match self {
90 RemoteArch::X86_64 => "x86_64",
91 RemoteArch::Aarch64 => "aarch64",
92 }
93 }
94}
95
96impl std::fmt::Display for RemoteArch {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.write_str(self.as_str())
99 }
100}
101
102#[derive(Copy, Clone, Debug)]
103pub struct RemotePlatform {
104 pub os: RemoteOs,
105 pub arch: RemoteArch,
106}
107
108#[derive(Clone, Debug)]
109pub struct CommandTemplate {
110 pub program: String,
111 pub args: Vec<String>,
112 pub env: HashMap<String, String>,
113}
114
115/// Whether a command should be run with TTY allocation for interactive use.
116#[derive(Clone, Copy, Debug, PartialEq, Eq)]
117pub enum Interactive {
118 /// Allocate a pseudo-TTY for interactive terminal use.
119 Yes,
120 /// Do not allocate a TTY - for commands that communicate via piped stdio.
121 No,
122}
123
124pub trait RemoteClientDelegate: Send + Sync {
125 fn ask_password(
126 &self,
127 prompt: String,
128 tx: oneshot::Sender<EncryptedPassword>,
129 cx: &mut AsyncApp,
130 );
131 fn get_download_url(
132 &self,
133 platform: RemotePlatform,
134 release_channel: ReleaseChannel,
135 version: Option<Version>,
136 cx: &mut AsyncApp,
137 ) -> Task<Result<Option<String>>>;
138 fn download_server_binary_locally(
139 &self,
140 platform: RemotePlatform,
141 release_channel: ReleaseChannel,
142 version: Option<Version>,
143 cx: &mut AsyncApp,
144 ) -> Task<Result<PathBuf>>;
145 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
146}
147
148const MAX_MISSED_HEARTBEATS: usize = 5;
149const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
150const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
151const INITIAL_CONNECTION_TIMEOUT: Duration =
152 Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
153
154pub const MAX_RECONNECT_ATTEMPTS: usize = 3;
155
156enum State {
157 Connecting,
158 Connected {
159 remote_connection: Arc<dyn RemoteConnection>,
160 delegate: Arc<dyn RemoteClientDelegate>,
161
162 multiplex_task: Task<Result<()>>,
163 heartbeat_task: Task<Result<()>>,
164 },
165 HeartbeatMissed {
166 missed_heartbeats: usize,
167
168 remote_connection: Arc<dyn RemoteConnection>,
169 delegate: Arc<dyn RemoteClientDelegate>,
170
171 multiplex_task: Task<Result<()>>,
172 heartbeat_task: Task<Result<()>>,
173 },
174 Reconnecting,
175 ReconnectFailed {
176 remote_connection: Arc<dyn RemoteConnection>,
177 delegate: Arc<dyn RemoteClientDelegate>,
178
179 error: anyhow::Error,
180 attempts: usize,
181 },
182 ReconnectExhausted,
183 ServerNotRunning,
184}
185
186impl fmt::Display for State {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 match self {
189 Self::Connecting => write!(f, "connecting"),
190 Self::Connected { .. } => write!(f, "connected"),
191 Self::Reconnecting => write!(f, "reconnecting"),
192 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
193 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
194 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
195 Self::ServerNotRunning { .. } => write!(f, "server not running"),
196 }
197 }
198}
199
200impl State {
201 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
202 match self {
203 Self::Connected {
204 remote_connection, ..
205 } => Some(remote_connection.clone()),
206 Self::HeartbeatMissed {
207 remote_connection, ..
208 } => Some(remote_connection.clone()),
209 Self::ReconnectFailed {
210 remote_connection, ..
211 } => Some(remote_connection.clone()),
212 _ => None,
213 }
214 }
215
216 fn can_reconnect(&self) -> bool {
217 match self {
218 Self::Connected { .. }
219 | Self::HeartbeatMissed { .. }
220 | Self::ReconnectFailed { .. } => true,
221 State::Connecting
222 | State::Reconnecting
223 | State::ReconnectExhausted
224 | State::ServerNotRunning => false,
225 }
226 }
227
228 fn is_reconnect_failed(&self) -> bool {
229 matches!(self, Self::ReconnectFailed { .. })
230 }
231
232 fn is_reconnect_exhausted(&self) -> bool {
233 matches!(self, Self::ReconnectExhausted { .. })
234 }
235
236 fn is_server_not_running(&self) -> bool {
237 matches!(self, Self::ServerNotRunning)
238 }
239
240 fn is_reconnecting(&self) -> bool {
241 matches!(self, Self::Reconnecting { .. })
242 }
243
244 fn heartbeat_recovered(self) -> Self {
245 match self {
246 Self::HeartbeatMissed {
247 remote_connection,
248 delegate,
249 multiplex_task,
250 heartbeat_task,
251 ..
252 } => Self::Connected {
253 remote_connection,
254 delegate,
255 multiplex_task,
256 heartbeat_task,
257 },
258 _ => self,
259 }
260 }
261
262 fn heartbeat_missed(self) -> Self {
263 match self {
264 Self::Connected {
265 remote_connection,
266 delegate,
267 multiplex_task,
268 heartbeat_task,
269 } => Self::HeartbeatMissed {
270 missed_heartbeats: 1,
271 remote_connection,
272 delegate,
273 multiplex_task,
274 heartbeat_task,
275 },
276 Self::HeartbeatMissed {
277 missed_heartbeats,
278 remote_connection,
279 delegate,
280 multiplex_task,
281 heartbeat_task,
282 } => Self::HeartbeatMissed {
283 missed_heartbeats: missed_heartbeats + 1,
284 remote_connection,
285 delegate,
286 multiplex_task,
287 heartbeat_task,
288 },
289 _ => self,
290 }
291 }
292}
293
294/// The state of the ssh connection.
295#[derive(Clone, Copy, Debug, PartialEq, Eq)]
296pub enum ConnectionState {
297 Connecting,
298 Connected,
299 HeartbeatMissed,
300 Reconnecting,
301 Disconnected,
302}
303
304impl From<&State> for ConnectionState {
305 fn from(value: &State) -> Self {
306 match value {
307 State::Connecting => Self::Connecting,
308 State::Connected { .. } => Self::Connected,
309 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
310 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
311 State::ReconnectExhausted => Self::Disconnected,
312 State::ServerNotRunning => Self::Disconnected,
313 }
314 }
315}
316
317pub struct RemoteClient {
318 client: Arc<ChannelClient>,
319 unique_identifier: String,
320 connection_options: RemoteConnectionOptions,
321 path_style: PathStyle,
322 state: Option<State>,
323}
324
325#[derive(Debug)]
326pub enum RemoteClientEvent {
327 Disconnected { server_not_running: bool },
328}
329
330impl EventEmitter<RemoteClientEvent> for RemoteClient {}
331
332/// Identifies the socket on the remote server so that reconnects
333/// can re-join the same project.
334pub enum ConnectionIdentifier {
335 Setup(u64),
336 Workspace(i64),
337}
338
339static NEXT_ID: AtomicU64 = AtomicU64::new(1);
340
341impl ConnectionIdentifier {
342 pub fn setup() -> Self {
343 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
344 }
345
346 // This string gets used in a socket name, and so must be relatively short.
347 // The total length of:
348 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
349 // Must be less than about 100 characters
350 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
351 // So our strings should be at most 20 characters or so.
352 fn to_string(&self, cx: &App) -> String {
353 let identifier_prefix = match ReleaseChannel::global(cx) {
354 ReleaseChannel::Stable => "".to_string(),
355 release_channel => format!("{}-", release_channel.dev_name()),
356 };
357 match self {
358 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
359 Self::Workspace(workspace_id) => {
360 format!("{identifier_prefix}workspace-{workspace_id}",)
361 }
362 }
363 }
364}
365
366pub async fn connect(
367 connection_options: RemoteConnectionOptions,
368 delegate: Arc<dyn RemoteClientDelegate>,
369 cx: &mut AsyncApp,
370) -> Result<Arc<dyn RemoteConnection>> {
371 cx.update(|cx| {
372 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
373 pool.connect(connection_options.clone(), delegate.clone(), cx)
374 })
375 })
376 .await
377 .map_err(|e| e.cloned())
378}
379
380/// Returns `true` if the global [`ConnectionPool`] already has a live
381/// connection for the given options. Callers can use this to decide
382/// whether to show interactive UI (e.g., a password modal) before
383/// connecting.
384pub fn has_active_connection(opts: &RemoteConnectionOptions, cx: &App) -> bool {
385 cx.try_global::<ConnectionPool>().is_some_and(|pool| {
386 matches!(
387 pool.connections.get(opts),
388 Some(ConnectionPoolEntry::Connected(remote))
389 if remote.upgrade().is_some_and(|r| !r.has_been_killed())
390 )
391 })
392}
393
394impl RemoteClient {
395 pub fn new(
396 unique_identifier: ConnectionIdentifier,
397 remote_connection: Arc<dyn RemoteConnection>,
398 cancellation: oneshot::Receiver<()>,
399 delegate: Arc<dyn RemoteClientDelegate>,
400 cx: &mut App,
401 ) -> Task<Result<Option<Entity<Self>>>> {
402 let unique_identifier = unique_identifier.to_string(cx);
403 cx.spawn(async move |cx| {
404 let success = Box::pin(async move {
405 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
406 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
407 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
408
409 let client = cx.update(|cx| {
410 ChannelClient::new(
411 incoming_rx,
412 outgoing_tx,
413 cx,
414 "client",
415 remote_connection.has_wsl_interop(),
416 )
417 });
418
419 let path_style = remote_connection.path_style();
420 let this = cx.new(|_| Self {
421 client: client.clone(),
422 unique_identifier: unique_identifier.clone(),
423 connection_options: remote_connection.connection_options(),
424 path_style,
425 state: Some(State::Connecting),
426 });
427
428 let io_task = remote_connection.start_proxy(
429 unique_identifier,
430 false,
431 incoming_tx,
432 outgoing_rx,
433 connection_activity_tx,
434 delegate.clone(),
435 cx,
436 );
437
438 let ready = client
439 .wait_for_remote_started()
440 .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
441 .await;
442 match ready {
443 Ok(Some(_)) => {}
444 Ok(None) => {
445 let mut error = "remote client exited before becoming ready".to_owned();
446 if let Some(status) = io_task.now_or_never() {
447 match status {
448 Ok(exit_code) => {
449 error.push_str(&format!(", exit_code={exit_code:?}"))
450 }
451 Err(e) => error.push_str(&format!(", error={e:?}")),
452 }
453 }
454 let error = anyhow::anyhow!("{error}");
455 log::error!("failed to establish connection: {}", error);
456 return Err(error);
457 }
458 Err(_) => {
459 let mut error = String::new();
460 if let Some(status) = io_task.now_or_never() {
461 error.push_str("Client exited with ");
462 match status {
463 Ok(exit_code) => {
464 error.push_str(&format!("exit_code {exit_code:?}"))
465 }
466 Err(e) => error.push_str(&format!("error {e:?}")),
467 }
468 } else {
469 error.push_str("client did not become ready within the timeout");
470 }
471 let error = anyhow::anyhow!("{error}");
472 log::error!("failed to establish connection: {error}");
473 return Err(error);
474 }
475 }
476 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
477 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
478 log::error!("failed to establish connection: {}", error);
479 return Err(error);
480 }
481
482 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
483
484 this.update(cx, |this, _| {
485 this.state = Some(State::Connected {
486 remote_connection,
487 delegate,
488 multiplex_task,
489 heartbeat_task,
490 });
491 });
492
493 Ok(Some(this))
494 });
495
496 select! {
497 _ = cancellation.fuse() => {
498 Ok(None)
499 }
500 result = success.fuse() => result
501 }
502 })
503 }
504
505 pub fn proto_client_from_channels(
506 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
507 outgoing_tx: mpsc::UnboundedSender<Envelope>,
508 cx: &App,
509 name: &'static str,
510 has_wsl_interop: bool,
511 ) -> AnyProtoClient {
512 ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
513 }
514
515 pub fn shutdown_processes<T: RequestMessage>(
516 &mut self,
517 shutdown_request: Option<T>,
518 executor: BackgroundExecutor,
519 ) -> Option<impl Future<Output = ()> + use<T>> {
520 let state = self.state.take()?;
521 log::info!("shutting down remote processes");
522
523 let State::Connected {
524 multiplex_task,
525 heartbeat_task,
526 remote_connection,
527 delegate,
528 } = state
529 else {
530 return None;
531 };
532
533 let client = self.client.clone();
534
535 Some(async move {
536 if let Some(shutdown_request) = shutdown_request {
537 client.send(shutdown_request).log_err();
538 // We wait 50ms instead of waiting for a response, because
539 // waiting for a response would require us to wait on the main thread
540 // which we want to avoid in an `on_app_quit` callback.
541 executor.timer(Duration::from_millis(50)).await;
542 }
543
544 // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
545 // child of master_process.
546 drop(multiplex_task);
547 // Now drop the rest of state, which kills master process.
548 drop(heartbeat_task);
549 drop(remote_connection);
550 drop(delegate);
551 })
552 }
553
554 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
555 let can_reconnect = self
556 .state
557 .as_ref()
558 .map(|state| state.can_reconnect())
559 .unwrap_or(false);
560 if !can_reconnect {
561 let state = if let Some(state) = self.state.as_ref() {
562 state.to_string()
563 } else {
564 "no state set".to_string()
565 };
566 log::info!(
567 "aborting reconnect, because not in state that allows reconnecting: {state}"
568 );
569 anyhow::bail!(
570 "aborting reconnect, because not in state that allows reconnecting: {state}"
571 );
572 }
573
574 let state = self.state.take().unwrap();
575 let (attempts, remote_connection, delegate) = match state {
576 State::Connected {
577 remote_connection,
578 delegate,
579 multiplex_task,
580 heartbeat_task,
581 }
582 | State::HeartbeatMissed {
583 remote_connection,
584 delegate,
585 multiplex_task,
586 heartbeat_task,
587 ..
588 } => {
589 drop(multiplex_task);
590 drop(heartbeat_task);
591 (0, remote_connection, delegate)
592 }
593 State::ReconnectFailed {
594 attempts,
595 remote_connection,
596 delegate,
597 ..
598 } => (attempts, remote_connection, delegate),
599 State::Connecting
600 | State::Reconnecting
601 | State::ReconnectExhausted
602 | State::ServerNotRunning => unreachable!(),
603 };
604
605 let attempts = attempts + 1;
606 if attempts > MAX_RECONNECT_ATTEMPTS {
607 log::error!(
608 "Failed to reconnect to after {} attempts, giving up",
609 MAX_RECONNECT_ATTEMPTS
610 );
611 self.set_state(State::ReconnectExhausted, cx);
612 return Ok(());
613 }
614
615 self.set_state(State::Reconnecting, cx);
616
617 log::info!(
618 "Trying to reconnect to remote server... Attempt {}",
619 attempts
620 );
621
622 let unique_identifier = self.unique_identifier.clone();
623 let client = self.client.clone();
624 let reconnect_task = cx.spawn(async move |this, cx| {
625 macro_rules! failed {
626 ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
627 delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
628 return State::ReconnectFailed {
629 error: anyhow!($error),
630 attempts: $attempts,
631 remote_connection: $remote_connection,
632 delegate: $delegate,
633 };
634 };
635 }
636
637 if let Err(error) = remote_connection
638 .kill()
639 .await
640 .context("Failed to kill remote_connection process")
641 {
642 failed!(error, attempts, remote_connection, delegate);
643 };
644
645 let connection_options = remote_connection.connection_options();
646
647 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
648 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
649 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
650
651 let (remote_connection, io_task) = match async {
652 let remote_connection = cx
653 .update_global(|pool: &mut ConnectionPool, cx| {
654 pool.connect(connection_options, delegate.clone(), cx)
655 })
656 .await
657 .map_err(|error| error.cloned())?;
658
659 let io_task = remote_connection.start_proxy(
660 unique_identifier,
661 true,
662 incoming_tx,
663 outgoing_rx,
664 connection_activity_tx,
665 delegate.clone(),
666 cx,
667 );
668 anyhow::Ok((remote_connection, io_task))
669 }
670 .await
671 {
672 Ok((remote_connection, io_task)) => (remote_connection, io_task),
673 Err(error) => {
674 failed!(error, attempts, remote_connection, delegate);
675 }
676 };
677
678 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
679 client.reconnect(incoming_rx, outgoing_tx, cx);
680
681 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
682 failed!(error, attempts, remote_connection, delegate);
683 };
684
685 State::Connected {
686 remote_connection,
687 delegate,
688 multiplex_task,
689 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
690 }
691 });
692
693 cx.spawn(async move |this, cx| {
694 let new_state = reconnect_task.await;
695 this.update(cx, |this, cx| {
696 this.try_set_state(cx, |old_state| {
697 if old_state.is_reconnecting() {
698 match &new_state {
699 State::Connecting
700 | State::Reconnecting
701 | State::HeartbeatMissed { .. }
702 | State::ServerNotRunning => {}
703 State::Connected { .. } => {
704 log::info!("Successfully reconnected");
705 }
706 State::ReconnectFailed {
707 error, attempts, ..
708 } => {
709 log::error!(
710 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
711 attempts,
712 error
713 );
714 }
715 State::ReconnectExhausted => {
716 log::error!("Reconnect attempt failed and all attempts exhausted");
717 }
718 }
719 Some(new_state)
720 } else {
721 None
722 }
723 });
724
725 if this.state_is(State::is_reconnect_failed) {
726 this.reconnect(cx)
727 } else if this.state_is(State::is_reconnect_exhausted) {
728 Ok(())
729 } else {
730 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
731 Ok(())
732 }
733 })
734 })
735 .detach_and_log_err(cx);
736
737 Ok(())
738 }
739
740 fn heartbeat(
741 this: WeakEntity<Self>,
742 mut connection_activity_rx: mpsc::Receiver<()>,
743 cx: &mut AsyncApp,
744 ) -> Task<Result<()>> {
745 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
746 return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
747 };
748
749 cx.spawn(async move |cx| {
750 let mut missed_heartbeats = 0;
751
752 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
753 futures::pin_mut!(keepalive_timer);
754
755 loop {
756 select_biased! {
757 result = connection_activity_rx.next().fuse() => {
758 if result.is_none() {
759 log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
760 return Ok(());
761 }
762
763 if missed_heartbeats != 0 {
764 missed_heartbeats = 0;
765 let _ =this.update(cx, |this, cx| {
766 this.handle_heartbeat_result(missed_heartbeats, cx)
767 })?;
768 }
769 }
770 _ = keepalive_timer => {
771 log::debug!("Sending heartbeat to server...");
772
773 let result = select_biased! {
774 _ = connection_activity_rx.next().fuse() => {
775 Ok(())
776 }
777 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
778 ping_result
779 }
780 };
781
782 if result.is_err() {
783 missed_heartbeats += 1;
784 log::warn!(
785 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
786 HEARTBEAT_TIMEOUT,
787 missed_heartbeats,
788 MAX_MISSED_HEARTBEATS
789 );
790 } else if missed_heartbeats != 0 {
791 missed_heartbeats = 0;
792 } else {
793 continue;
794 }
795
796 let result = this.update(cx, |this, cx| {
797 this.handle_heartbeat_result(missed_heartbeats, cx)
798 })?;
799 if result.is_break() {
800 return Ok(());
801 }
802 }
803 }
804
805 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
806 }
807 })
808 }
809
810 fn handle_heartbeat_result(
811 &mut self,
812 missed_heartbeats: usize,
813 cx: &mut Context<Self>,
814 ) -> ControlFlow<()> {
815 let state = self.state.take().unwrap();
816 let next_state = if missed_heartbeats > 0 {
817 state.heartbeat_missed()
818 } else {
819 state.heartbeat_recovered()
820 };
821
822 self.set_state(next_state, cx);
823
824 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
825 log::error!(
826 "Missed last {} heartbeats. Reconnecting...",
827 missed_heartbeats
828 );
829
830 self.reconnect(cx)
831 .context("failed to start reconnect process after missing heartbeats")
832 .log_err();
833 ControlFlow::Break(())
834 } else {
835 ControlFlow::Continue(())
836 }
837 }
838
839 fn monitor(
840 this: WeakEntity<Self>,
841 io_task: Task<Result<i32>>,
842 cx: &AsyncApp,
843 ) -> Task<Result<()>> {
844 cx.spawn(async move |cx| {
845 let result = io_task.await;
846
847 match result {
848 Ok(exit_code) => {
849 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
850 match error {
851 ProxyLaunchError::ServerNotRunning => {
852 log::error!("failed to reconnect because server is not running");
853 this.update(cx, |this, cx| {
854 this.set_state(State::ServerNotRunning, cx);
855 })?;
856 }
857 }
858 } else {
859 log::error!("proxy process terminated unexpectedly: {exit_code}");
860 this.update(cx, |this, cx| {
861 this.reconnect(cx).ok();
862 })?;
863 }
864 }
865 Err(error) => {
866 log::warn!(
867 "remote io task died with error: {:?}. reconnecting...",
868 error
869 );
870 this.update(cx, |this, cx| {
871 this.reconnect(cx).ok();
872 })?;
873 }
874 }
875
876 Ok(())
877 })
878 }
879
880 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
881 self.state.as_ref().is_some_and(check)
882 }
883
884 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
885 let new_state = self.state.as_ref().and_then(map);
886 if let Some(new_state) = new_state {
887 self.state.replace(new_state);
888 cx.notify();
889 }
890 }
891
892 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
893 log::info!("setting state to '{}'", &state);
894
895 let is_reconnect_exhausted = state.is_reconnect_exhausted();
896 let is_server_not_running = state.is_server_not_running();
897 self.state.replace(state);
898
899 if is_reconnect_exhausted || is_server_not_running {
900 cx.emit(RemoteClientEvent::Disconnected {
901 server_not_running: is_server_not_running,
902 });
903 }
904 cx.notify();
905 }
906
907 pub fn shell(&self) -> Option<String> {
908 Some(self.remote_connection()?.shell())
909 }
910
911 pub fn default_system_shell(&self) -> Option<String> {
912 Some(self.remote_connection()?.default_system_shell())
913 }
914
915 pub fn shares_network_interface(&self) -> bool {
916 self.remote_connection()
917 .map_or(false, |connection| connection.shares_network_interface())
918 }
919
920 pub fn has_wsl_interop(&self) -> bool {
921 self.remote_connection()
922 .map_or(false, |connection| connection.has_wsl_interop())
923 }
924
925 pub fn build_command(
926 &self,
927 program: Option<String>,
928 args: &[String],
929 env: &HashMap<String, String>,
930 working_dir: Option<String>,
931 port_forward: Option<(u16, String, u16)>,
932 ) -> Result<CommandTemplate> {
933 self.build_command_with_options(
934 program,
935 args,
936 env,
937 working_dir,
938 port_forward,
939 Interactive::Yes,
940 )
941 }
942
943 pub fn build_command_with_options(
944 &self,
945 program: Option<String>,
946 args: &[String],
947 env: &HashMap<String, String>,
948 working_dir: Option<String>,
949 port_forward: Option<(u16, String, u16)>,
950 interactive: Interactive,
951 ) -> Result<CommandTemplate> {
952 let Some(connection) = self.remote_connection() else {
953 return Err(anyhow!("no remote connection"));
954 };
955 connection.build_command(program, args, env, working_dir, port_forward, interactive)
956 }
957
958 pub fn build_forward_ports_command(
959 &self,
960 forwards: Vec<(u16, String, u16)>,
961 ) -> Result<CommandTemplate> {
962 let Some(connection) = self.remote_connection() else {
963 return Err(anyhow!("no remote connection"));
964 };
965 connection.build_forward_ports_command(forwards)
966 }
967
968 pub fn upload_directory(
969 &self,
970 src_path: PathBuf,
971 dest_path: RemotePathBuf,
972 cx: &App,
973 ) -> Task<Result<()>> {
974 let Some(connection) = self.remote_connection() else {
975 return Task::ready(Err(anyhow!("no remote connection")));
976 };
977 connection.upload_directory(src_path, dest_path, cx)
978 }
979
980 pub fn proto_client(&self) -> AnyProtoClient {
981 self.client.clone().into()
982 }
983
984 pub fn connection_options(&self) -> RemoteConnectionOptions {
985 self.connection_options.clone()
986 }
987
988 pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
989 if let State::Connected {
990 remote_connection, ..
991 } = self.state.as_ref()?
992 {
993 Some(remote_connection.clone())
994 } else {
995 None
996 }
997 }
998
999 pub fn connection_state(&self) -> ConnectionState {
1000 self.state
1001 .as_ref()
1002 .map(ConnectionState::from)
1003 .unwrap_or(ConnectionState::Disconnected)
1004 }
1005
1006 pub fn is_disconnected(&self) -> bool {
1007 self.connection_state() == ConnectionState::Disconnected
1008 }
1009
1010 pub fn path_style(&self) -> PathStyle {
1011 self.path_style
1012 }
1013
1014 /// Forcibly disconnects from the remote server by killing the underlying connection.
1015 /// This will trigger the reconnection logic if reconnection attempts remain.
1016 /// Useful for testing reconnection behavior in real environments.
1017 pub fn force_disconnect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1018 let Some(connection) = self.remote_connection() else {
1019 return Task::ready(Err(anyhow!("no active remote connection to disconnect")));
1020 };
1021
1022 log::info!("force_disconnect: killing remote connection");
1023
1024 cx.spawn(async move |_, _| {
1025 connection.kill().await?;
1026 Ok(())
1027 })
1028 }
1029
1030 /// Simulates a timeout by pausing heartbeat responses.
1031 /// This will cause heartbeat failures and eventually trigger reconnection
1032 /// after MAX_MISSED_HEARTBEATS are missed.
1033 /// Useful for testing timeout behavior in real environments.
1034 pub fn force_heartbeat_timeout(&mut self, attempts: usize, cx: &mut Context<Self>) {
1035 log::info!("force_heartbeat_timeout: triggering heartbeat failure state");
1036
1037 if let Some(State::Connected {
1038 remote_connection,
1039 delegate,
1040 multiplex_task,
1041 heartbeat_task,
1042 }) = self.state.take()
1043 {
1044 self.set_state(
1045 if attempts == 0 {
1046 State::HeartbeatMissed {
1047 missed_heartbeats: MAX_MISSED_HEARTBEATS,
1048 remote_connection,
1049 delegate,
1050 multiplex_task,
1051 heartbeat_task,
1052 }
1053 } else {
1054 State::ReconnectFailed {
1055 remote_connection,
1056 delegate,
1057 error: anyhow!("forced heartbeat timeout"),
1058 attempts,
1059 }
1060 },
1061 cx,
1062 );
1063
1064 self.reconnect(cx)
1065 .context("failed to start reconnect after forced timeout")
1066 .log_err();
1067 } else {
1068 log::warn!("force_heartbeat_timeout: not in Connected state, ignoring");
1069 }
1070 }
1071
1072 #[cfg(any(test, feature = "test-support"))]
1073 pub fn force_server_not_running(&mut self, cx: &mut Context<Self>) {
1074 self.set_state(State::ServerNotRunning, cx);
1075 }
1076
1077 #[cfg(any(test, feature = "test-support"))]
1078 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
1079 let opts = self.connection_options();
1080 client_cx.spawn(async move |cx| {
1081 let connection = cx.update_global(|c: &mut ConnectionPool, _| {
1082 if let Some(ConnectionPoolEntry::Connected(c)) = c.connections.get(&opts) {
1083 if let Some(connection) = c.upgrade() {
1084 connection
1085 } else {
1086 panic!("connection was dropped")
1087 }
1088 } else {
1089 panic!("missing test connection")
1090 }
1091 });
1092
1093 connection.simulate_disconnect(cx);
1094 })
1095 }
1096
1097 /// Creates a mock connection pair for testing.
1098 ///
1099 /// This is the recommended way to create mock remote connections for tests.
1100 /// It returns the `MockConnectionOptions` (which can be passed to create a
1101 /// `HeadlessProject`), an `AnyProtoClient` for the server side and a
1102 /// `ConnectGuard` for the client side which blocks the connection from
1103 /// being established until dropped.
1104 ///
1105 /// # Example
1106 /// ```ignore
1107 /// let (opts, server_session, connect_guard) = RemoteClient::fake_server(cx, server_cx);
1108 /// // Set up HeadlessProject with server_session...
1109 /// drop(connect_guard);
1110 /// let client = RemoteClient::fake_client(opts, cx).await;
1111 /// ```
1112 #[cfg(any(test, feature = "test-support"))]
1113 pub fn fake_server(
1114 client_cx: &mut gpui::TestAppContext,
1115 server_cx: &mut gpui::TestAppContext,
1116 ) -> (RemoteConnectionOptions, AnyProtoClient, ConnectGuard) {
1117 use crate::transport::mock::MockConnection;
1118 let (opts, server_client, connect_guard) = MockConnection::new(client_cx, server_cx);
1119 (opts.into(), server_client, connect_guard)
1120 }
1121
1122 /// Registers a new mock server for existing connection options.
1123 ///
1124 /// Use this to simulate reconnection: after forcing a disconnect, register
1125 /// a new server so the next `connect()` call succeeds.
1126 #[cfg(any(test, feature = "test-support"))]
1127 pub fn fake_server_with_opts(
1128 opts: &RemoteConnectionOptions,
1129 client_cx: &mut gpui::TestAppContext,
1130 server_cx: &mut gpui::TestAppContext,
1131 ) -> (AnyProtoClient, ConnectGuard) {
1132 use crate::transport::mock::MockConnection;
1133 let mock_opts = match opts {
1134 RemoteConnectionOptions::Mock(mock_opts) => mock_opts.clone(),
1135 _ => panic!("fake_server_with_opts requires Mock connection options"),
1136 };
1137 MockConnection::new_with_opts(mock_opts, client_cx, server_cx)
1138 }
1139
1140 /// Creates a `RemoteClient` connected to a mock server.
1141 ///
1142 /// Call `fake_server` first to get the connection options, set up the
1143 /// `HeadlessProject` with the server session, then call this method
1144 /// to create the client.
1145 #[cfg(any(test, feature = "test-support"))]
1146 pub async fn connect_mock(
1147 opts: RemoteConnectionOptions,
1148 client_cx: &mut gpui::TestAppContext,
1149 ) -> Entity<Self> {
1150 assert!(matches!(opts, RemoteConnectionOptions::Mock(..)));
1151 use crate::transport::mock::MockDelegate;
1152 let (_tx, rx) = oneshot::channel();
1153 let mut cx = client_cx.to_async();
1154 let connection = connect(opts, Arc::new(MockDelegate), &mut cx)
1155 .await
1156 .unwrap();
1157 client_cx
1158 .update(|cx| {
1159 Self::new(
1160 ConnectionIdentifier::setup(),
1161 connection,
1162 rx,
1163 Arc::new(MockDelegate),
1164 cx,
1165 )
1166 })
1167 .await
1168 .unwrap()
1169 .unwrap()
1170 }
1171
1172 pub fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1173 self.state
1174 .as_ref()
1175 .and_then(|state| state.remote_connection())
1176 }
1177}
1178
1179enum ConnectionPoolEntry {
1180 Connecting(WeakShared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1181 Connected(Weak<dyn RemoteConnection>),
1182}
1183
1184#[derive(Default)]
1185struct ConnectionPool {
1186 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1187}
1188
1189impl Global for ConnectionPool {}
1190
1191impl ConnectionPool {
1192 fn connect(
1193 &mut self,
1194 opts: RemoteConnectionOptions,
1195 delegate: Arc<dyn RemoteClientDelegate>,
1196 cx: &mut App,
1197 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1198 let connection = self.connections.get(&opts);
1199 match connection {
1200 Some(ConnectionPoolEntry::Connecting(task)) => {
1201 if let Some(task) = task.upgrade() {
1202 log::debug!("Connecting task is still alive");
1203 cx.spawn(async move |cx| {
1204 delegate.set_status(Some("Waiting for existing connection attempt"), cx)
1205 })
1206 .detach();
1207 return task;
1208 }
1209 log::debug!("Connecting task is dead, removing it and restarting a connection");
1210 self.connections.remove(&opts);
1211 }
1212 Some(ConnectionPoolEntry::Connected(remote)) => {
1213 if let Some(remote) = remote.upgrade()
1214 && !remote.has_been_killed()
1215 {
1216 log::debug!("Connection is still alive");
1217 return Task::ready(Ok(remote)).shared();
1218 }
1219 log::debug!("Connection is dead, removing it and restarting a connection");
1220 self.connections.remove(&opts);
1221 }
1222 None => {
1223 log::debug!("No existing connection found, starting a new one");
1224 }
1225 }
1226
1227 let task = cx
1228 .spawn({
1229 let opts = opts.clone();
1230 let delegate = delegate.clone();
1231 async move |cx| {
1232 let connection = match opts.clone() {
1233 RemoteConnectionOptions::Ssh(opts) => {
1234 SshRemoteConnection::new(opts, delegate, cx)
1235 .await
1236 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1237 }
1238 RemoteConnectionOptions::Wsl(opts) => {
1239 WslRemoteConnection::new(opts, delegate, cx)
1240 .await
1241 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1242 }
1243 RemoteConnectionOptions::Docker(opts) => {
1244 DockerExecConnection::new(opts, delegate, cx)
1245 .await
1246 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1247 }
1248 #[cfg(any(test, feature = "test-support"))]
1249 RemoteConnectionOptions::Mock(opts) => match cx.update(|cx| {
1250 cx.default_global::<crate::transport::mock::MockConnectionRegistry>()
1251 .take(&opts)
1252 }) {
1253 Some(connection) => Ok(connection.await as Arc<dyn RemoteConnection>),
1254 None => Err(anyhow!(
1255 "Mock connection not found. Call MockConnection::new() first."
1256 )),
1257 },
1258 };
1259
1260 cx.update_global(|pool: &mut Self, _| {
1261 debug_assert!(matches!(
1262 pool.connections.get(&opts),
1263 Some(ConnectionPoolEntry::Connecting(_))
1264 ));
1265 match connection {
1266 Ok(connection) => {
1267 pool.connections.insert(
1268 opts.clone(),
1269 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1270 );
1271 Ok(connection)
1272 }
1273 Err(error) => {
1274 pool.connections.remove(&opts);
1275 Err(Arc::new(error))
1276 }
1277 }
1278 })
1279 }
1280 })
1281 .shared();
1282 if let Some(task) = task.downgrade() {
1283 self.connections
1284 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task));
1285 }
1286 task
1287 }
1288}
1289
1290#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
1291pub enum RemoteConnectionOptions {
1292 Ssh(SshConnectionOptions),
1293 Wsl(WslConnectionOptions),
1294 Docker(DockerConnectionOptions),
1295 #[cfg(any(test, feature = "test-support"))]
1296 Mock(crate::transport::mock::MockConnectionOptions),
1297}
1298
1299impl RemoteConnectionOptions {
1300 pub fn display_name(&self) -> String {
1301 match self {
1302 RemoteConnectionOptions::Ssh(opts) => opts
1303 .nickname
1304 .clone()
1305 .unwrap_or_else(|| opts.host.to_string()),
1306 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1307 RemoteConnectionOptions::Docker(opts) => {
1308 if opts.use_podman {
1309 format!("[podman] {}", opts.name)
1310 } else {
1311 opts.name.clone()
1312 }
1313 }
1314 #[cfg(any(test, feature = "test-support"))]
1315 RemoteConnectionOptions::Mock(opts) => format!("mock-{}", opts.id),
1316 }
1317 }
1318}
1319
1320#[cfg(test)]
1321mod tests {
1322 use super::*;
1323
1324 #[test]
1325 fn test_ssh_display_name_prefers_nickname() {
1326 let options = RemoteConnectionOptions::Ssh(SshConnectionOptions {
1327 host: "1.2.3.4".into(),
1328 nickname: Some("My Cool Project".to_string()),
1329 ..Default::default()
1330 });
1331
1332 assert_eq!(options.display_name(), "My Cool Project");
1333 }
1334
1335 #[test]
1336 fn test_ssh_display_name_falls_back_to_host() {
1337 let options = RemoteConnectionOptions::Ssh(SshConnectionOptions {
1338 host: "1.2.3.4".into(),
1339 ..Default::default()
1340 });
1341
1342 assert_eq!(options.display_name(), "1.2.3.4");
1343 }
1344}
1345
1346impl From<SshConnectionOptions> for RemoteConnectionOptions {
1347 fn from(opts: SshConnectionOptions) -> Self {
1348 RemoteConnectionOptions::Ssh(opts)
1349 }
1350}
1351
1352impl From<WslConnectionOptions> for RemoteConnectionOptions {
1353 fn from(opts: WslConnectionOptions) -> Self {
1354 RemoteConnectionOptions::Wsl(opts)
1355 }
1356}
1357
1358#[cfg(any(test, feature = "test-support"))]
1359impl From<crate::transport::mock::MockConnectionOptions> for RemoteConnectionOptions {
1360 fn from(opts: crate::transport::mock::MockConnectionOptions) -> Self {
1361 RemoteConnectionOptions::Mock(opts)
1362 }
1363}
1364
1365#[cfg(target_os = "windows")]
1366/// Open a wsl path (\\wsl.localhost\<distro>\path)
1367#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1368#[action(namespace = workspace, no_json, no_register)]
1369pub struct OpenWslPath {
1370 pub distro: WslConnectionOptions,
1371 pub paths: Vec<PathBuf>,
1372}
1373
1374#[async_trait(?Send)]
1375pub trait RemoteConnection: Send + Sync {
1376 fn start_proxy(
1377 &self,
1378 unique_identifier: String,
1379 reconnect: bool,
1380 incoming_tx: UnboundedSender<Envelope>,
1381 outgoing_rx: UnboundedReceiver<Envelope>,
1382 connection_activity_tx: Sender<()>,
1383 delegate: Arc<dyn RemoteClientDelegate>,
1384 cx: &mut AsyncApp,
1385 ) -> Task<Result<i32>>;
1386 fn upload_directory(
1387 &self,
1388 src_path: PathBuf,
1389 dest_path: RemotePathBuf,
1390 cx: &App,
1391 ) -> Task<Result<()>>;
1392 async fn kill(&self) -> Result<()>;
1393 fn has_been_killed(&self) -> bool;
1394 fn shares_network_interface(&self) -> bool {
1395 false
1396 }
1397 fn build_command(
1398 &self,
1399 program: Option<String>,
1400 args: &[String],
1401 env: &HashMap<String, String>,
1402 working_dir: Option<String>,
1403 port_forward: Option<(u16, String, u16)>,
1404 interactive: Interactive,
1405 ) -> Result<CommandTemplate>;
1406 fn build_forward_ports_command(
1407 &self,
1408 forwards: Vec<(u16, String, u16)>,
1409 ) -> Result<CommandTemplate>;
1410 fn connection_options(&self) -> RemoteConnectionOptions;
1411 fn path_style(&self) -> PathStyle;
1412 fn shell(&self) -> String;
1413 fn default_system_shell(&self) -> String;
1414 fn has_wsl_interop(&self) -> bool;
1415
1416 #[cfg(any(test, feature = "test-support"))]
1417 fn simulate_disconnect(&self, _: &AsyncApp) {}
1418}
1419
1420type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1421
1422struct Signal<T> {
1423 tx: Mutex<Option<oneshot::Sender<T>>>,
1424 rx: Shared<Task<Option<T>>>,
1425}
1426
1427impl<T: Send + Clone + 'static> Signal<T> {
1428 pub fn new(cx: &App) -> Self {
1429 let (tx, rx) = oneshot::channel();
1430
1431 let task = cx
1432 .background_executor()
1433 .spawn(async move { rx.await.ok() })
1434 .shared();
1435
1436 Self {
1437 tx: Mutex::new(Some(tx)),
1438 rx: task,
1439 }
1440 }
1441
1442 fn set(&self, value: T) {
1443 if let Some(tx) = self.tx.lock().take() {
1444 let _ = tx.send(value);
1445 }
1446 }
1447
1448 fn wait(&self) -> Shared<Task<Option<T>>> {
1449 self.rx.clone()
1450 }
1451}
1452
1453pub(crate) struct ChannelClient {
1454 next_message_id: AtomicU32,
1455 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1456 buffer: Mutex<VecDeque<Envelope>>,
1457 response_channels: ResponseChannels,
1458 message_handlers: Mutex<ProtoMessageHandlerSet>,
1459 max_received: AtomicU32,
1460 name: &'static str,
1461 task: Mutex<Task<Result<()>>>,
1462 remote_started: Signal<()>,
1463 has_wsl_interop: bool,
1464 executor: BackgroundExecutor,
1465}
1466
1467impl ChannelClient {
1468 pub(crate) fn new(
1469 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1470 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1471 cx: &App,
1472 name: &'static str,
1473 has_wsl_interop: bool,
1474 ) -> Arc<Self> {
1475 Arc::new_cyclic(|this| Self {
1476 outgoing_tx: Mutex::new(outgoing_tx),
1477 next_message_id: AtomicU32::new(0),
1478 max_received: AtomicU32::new(0),
1479 response_channels: ResponseChannels::default(),
1480 message_handlers: Default::default(),
1481 buffer: Mutex::new(VecDeque::new()),
1482 name,
1483 executor: cx.background_executor().clone(),
1484 task: Mutex::new(Self::start_handling_messages(
1485 this.clone(),
1486 incoming_rx,
1487 &cx.to_async(),
1488 )),
1489 remote_started: Signal::new(cx),
1490 has_wsl_interop,
1491 })
1492 }
1493
1494 fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1495 self.remote_started.wait()
1496 }
1497
1498 fn start_handling_messages(
1499 this: Weak<Self>,
1500 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1501 cx: &AsyncApp,
1502 ) -> Task<Result<()>> {
1503 cx.spawn(async move |cx| {
1504 if let Some(this) = this.upgrade() {
1505 let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1506 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1507 };
1508
1509 let peer_id = PeerId { owner_id: 0, id: 0 };
1510 while let Some(incoming) = incoming_rx.next().await {
1511 let Some(this) = this.upgrade() else {
1512 return anyhow::Ok(());
1513 };
1514 if let Some(ack_id) = incoming.ack_id {
1515 let mut buffer = this.buffer.lock();
1516 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1517 buffer.pop_front();
1518 }
1519 }
1520 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1521 {
1522 log::debug!(
1523 "{}:remote message received. name:FlushBufferedMessages",
1524 this.name
1525 );
1526 {
1527 let buffer = this.buffer.lock();
1528 for envelope in buffer.iter() {
1529 this.outgoing_tx
1530 .lock()
1531 .unbounded_send(envelope.clone())
1532 .ok();
1533 }
1534 }
1535 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1536 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1537 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1538 continue;
1539 }
1540
1541 if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1542 this.remote_started.set(());
1543 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1544 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1545 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1546 continue;
1547 }
1548
1549 this.max_received.store(incoming.id, SeqCst);
1550
1551 if let Some(request_id) = incoming.responding_to {
1552 let request_id = MessageId(request_id);
1553 let sender = this.response_channels.lock().remove(&request_id);
1554 if let Some(sender) = sender {
1555 let (tx, rx) = oneshot::channel();
1556 if incoming.payload.is_some() {
1557 sender.send((incoming, tx)).ok();
1558 }
1559 rx.await.ok();
1560 }
1561 } else if let Some(envelope) =
1562 build_typed_envelope(peer_id, Instant::now(), incoming)
1563 {
1564 let type_name = envelope.payload_type_name();
1565 let message_id = envelope.message_id();
1566 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1567 &this.message_handlers,
1568 envelope,
1569 this.clone().into(),
1570 cx.clone(),
1571 ) {
1572 log::debug!("{}:remote message received. name:{type_name}", this.name);
1573 cx.foreground_executor()
1574 .spawn(async move {
1575 match future.await {
1576 Ok(_) => {
1577 log::debug!(
1578 "{}:remote message handled. name:{type_name}",
1579 this.name
1580 );
1581 }
1582 Err(error) => {
1583 log::error!(
1584 "{}:error handling message. type:{}, error:{:#}",
1585 this.name,
1586 type_name,
1587 format!("{error:#}").lines().fold(
1588 String::new(),
1589 |mut message, line| {
1590 if !message.is_empty() {
1591 message.push(' ');
1592 }
1593 message.push_str(line);
1594 message
1595 }
1596 )
1597 );
1598 }
1599 }
1600 })
1601 .detach()
1602 } else {
1603 log::error!("{}:unhandled remote message name:{type_name}", this.name);
1604 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1605 message_id,
1606 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1607 ) {
1608 log::error!(
1609 "{}:error sending error response for {type_name}:{e:#}",
1610 this.name
1611 );
1612 }
1613 }
1614 }
1615 }
1616 anyhow::Ok(())
1617 })
1618 }
1619
1620 pub(crate) fn reconnect(
1621 self: &Arc<Self>,
1622 incoming_rx: UnboundedReceiver<Envelope>,
1623 outgoing_tx: UnboundedSender<Envelope>,
1624 cx: &AsyncApp,
1625 ) {
1626 *self.outgoing_tx.lock() = outgoing_tx;
1627 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1628 }
1629
1630 fn request<T: RequestMessage>(
1631 &self,
1632 payload: T,
1633 ) -> impl 'static + Future<Output = Result<T::Response>> {
1634 self.request_internal(payload, true)
1635 }
1636
1637 fn request_internal<T: RequestMessage>(
1638 &self,
1639 payload: T,
1640 use_buffer: bool,
1641 ) -> impl 'static + Future<Output = Result<T::Response>> {
1642 log::debug!("remote request start. name:{}", T::NAME);
1643 let response =
1644 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1645 async move {
1646 let response = response.await?;
1647 log::debug!("remote request finish. name:{}", T::NAME);
1648 T::Response::from_envelope(response).context("received a response of the wrong type")
1649 }
1650 }
1651
1652 async fn resync(&self, timeout: Duration) -> Result<()> {
1653 smol::future::or(
1654 async {
1655 self.request_internal(proto::FlushBufferedMessages {}, false)
1656 .await?;
1657
1658 for envelope in self.buffer.lock().iter() {
1659 self.outgoing_tx
1660 .lock()
1661 .unbounded_send(envelope.clone())
1662 .ok();
1663 }
1664 Ok(())
1665 },
1666 async {
1667 self.executor.timer(timeout).await;
1668 anyhow::bail!("Timed out resyncing remote client")
1669 },
1670 )
1671 .await
1672 }
1673
1674 async fn ping(&self, timeout: Duration) -> Result<()> {
1675 smol::future::or(
1676 async {
1677 self.request(proto::Ping {}).await?;
1678 Ok(())
1679 },
1680 async {
1681 self.executor.timer(timeout).await;
1682 anyhow::bail!("Timed out pinging remote client")
1683 },
1684 )
1685 .await
1686 }
1687
1688 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1689 log::debug!("remote send name:{}", T::NAME);
1690 self.send_dynamic(payload.into_envelope(0, None, None))
1691 }
1692
1693 fn request_dynamic(
1694 &self,
1695 mut envelope: proto::Envelope,
1696 type_name: &'static str,
1697 use_buffer: bool,
1698 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1699 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1700 let (tx, rx) = oneshot::channel();
1701 let mut response_channels_lock = self.response_channels.lock();
1702 response_channels_lock.insert(MessageId(envelope.id), tx);
1703 drop(response_channels_lock);
1704
1705 let result = if use_buffer {
1706 self.send_buffered(envelope)
1707 } else {
1708 self.send_unbuffered(envelope)
1709 };
1710 async move {
1711 if let Err(error) = &result {
1712 log::error!("failed to send message: {error}");
1713 anyhow::bail!("failed to send message: {error}");
1714 }
1715
1716 let response = rx.await.context("connection lost")?.0;
1717 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1718 return Err(RpcError::from_proto(error, type_name));
1719 }
1720 Ok(response)
1721 }
1722 }
1723
1724 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1725 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1726 self.send_buffered(envelope)
1727 }
1728
1729 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1730 envelope.ack_id = Some(self.max_received.load(SeqCst));
1731 self.buffer.lock().push_back(envelope.clone());
1732 // ignore errors on send (happen while we're reconnecting)
1733 // assume that the global "disconnected" overlay is sufficient.
1734 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1735 Ok(())
1736 }
1737
1738 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1739 envelope.ack_id = Some(self.max_received.load(SeqCst));
1740 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1741 Ok(())
1742 }
1743}
1744
1745impl ProtoClient for ChannelClient {
1746 fn request(
1747 &self,
1748 envelope: proto::Envelope,
1749 request_type: &'static str,
1750 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1751 self.request_dynamic(envelope, request_type, true).boxed()
1752 }
1753
1754 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1755 self.send_dynamic(envelope)
1756 }
1757
1758 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1759 self.send_dynamic(envelope)
1760 }
1761
1762 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1763 &self.message_handlers
1764 }
1765
1766 fn is_via_collab(&self) -> bool {
1767 false
1768 }
1769
1770 fn has_wsl_interop(&self) -> bool {
1771 self.has_wsl_interop
1772 }
1773}