1#[cfg(any(test, feature = "test-support"))]
2use crate::transport::mock::ConnectGuard;
3use crate::{
4 SshConnectionOptions,
5 protocol::MessageId,
6 proxy::ProxyLaunchError,
7 transport::{
8 docker::{DockerConnectionOptions, DockerExecConnection},
9 ssh::SshRemoteConnection,
10 wsl::{WslConnectionOptions, WslRemoteConnection},
11 },
12};
13use anyhow::{Context as _, Result, anyhow};
14use askpass::EncryptedPassword;
15use async_trait::async_trait;
16use collections::HashMap;
17use futures::{
18 Future, FutureExt as _, StreamExt as _,
19 channel::{
20 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
21 oneshot,
22 },
23 future::{BoxFuture, Shared, WeakShared},
24 select, select_biased,
25};
26use gpui::{
27 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
28 EventEmitter, FutureExt, Global, Task, WeakEntity,
29};
30use parking_lot::Mutex;
31
32use release_channel::ReleaseChannel;
33use rpc::{
34 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
35 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
36};
37use semver::Version;
38use std::{
39 collections::VecDeque,
40 fmt,
41 ops::ControlFlow,
42 path::PathBuf,
43 sync::{
44 Arc, Weak,
45 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
46 },
47 time::{Duration, Instant},
48};
49use util::{
50 ResultExt,
51 paths::{PathStyle, RemotePathBuf},
52};
53
54#[derive(Copy, Clone, Debug, PartialEq, Eq)]
55pub enum RemoteOs {
56 Linux,
57 MacOs,
58 Windows,
59}
60
61impl RemoteOs {
62 pub fn as_str(&self) -> &'static str {
63 match self {
64 RemoteOs::Linux => "linux",
65 RemoteOs::MacOs => "macos",
66 RemoteOs::Windows => "windows",
67 }
68 }
69
70 pub fn is_windows(&self) -> bool {
71 matches!(self, RemoteOs::Windows)
72 }
73}
74
75impl std::fmt::Display for RemoteOs {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.write_str(self.as_str())
78 }
79}
80
81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
82pub enum RemoteArch {
83 X86_64,
84 Aarch64,
85}
86
87impl RemoteArch {
88 pub fn as_str(&self) -> &'static str {
89 match self {
90 RemoteArch::X86_64 => "x86_64",
91 RemoteArch::Aarch64 => "aarch64",
92 }
93 }
94}
95
96impl std::fmt::Display for RemoteArch {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.write_str(self.as_str())
99 }
100}
101
102#[derive(Copy, Clone, Debug)]
103pub struct RemotePlatform {
104 pub os: RemoteOs,
105 pub arch: RemoteArch,
106}
107
108#[derive(Clone, Debug)]
109pub struct CommandTemplate {
110 pub program: String,
111 pub args: Vec<String>,
112 pub env: HashMap<String, String>,
113}
114
115/// Whether a command should be run with TTY allocation for interactive use.
116#[derive(Clone, Copy, Debug, PartialEq, Eq)]
117pub enum Interactive {
118 /// Allocate a pseudo-TTY for interactive terminal use.
119 Yes,
120 /// Do not allocate a TTY - for commands that communicate via piped stdio.
121 No,
122}
123
124pub trait RemoteClientDelegate: Send + Sync {
125 fn ask_password(
126 &self,
127 prompt: String,
128 tx: oneshot::Sender<EncryptedPassword>,
129 cx: &mut AsyncApp,
130 );
131 fn get_download_url(
132 &self,
133 platform: RemotePlatform,
134 release_channel: ReleaseChannel,
135 version: Option<Version>,
136 cx: &mut AsyncApp,
137 ) -> Task<Result<Option<String>>>;
138 fn download_server_binary_locally(
139 &self,
140 platform: RemotePlatform,
141 release_channel: ReleaseChannel,
142 version: Option<Version>,
143 cx: &mut AsyncApp,
144 ) -> Task<Result<PathBuf>>;
145 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
146}
147
148const MAX_MISSED_HEARTBEATS: usize = 5;
149const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
150const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
151const INITIAL_CONNECTION_TIMEOUT: Duration =
152 Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
153
154pub const MAX_RECONNECT_ATTEMPTS: usize = 3;
155
156enum State {
157 Connecting,
158 Connected {
159 remote_connection: Arc<dyn RemoteConnection>,
160 delegate: Arc<dyn RemoteClientDelegate>,
161
162 multiplex_task: Task<Result<()>>,
163 heartbeat_task: Task<Result<()>>,
164 },
165 HeartbeatMissed {
166 missed_heartbeats: usize,
167
168 remote_connection: Arc<dyn RemoteConnection>,
169 delegate: Arc<dyn RemoteClientDelegate>,
170
171 multiplex_task: Task<Result<()>>,
172 heartbeat_task: Task<Result<()>>,
173 },
174 Reconnecting,
175 ReconnectFailed {
176 remote_connection: Arc<dyn RemoteConnection>,
177 delegate: Arc<dyn RemoteClientDelegate>,
178
179 error: anyhow::Error,
180 attempts: usize,
181 },
182 ReconnectExhausted,
183 ServerNotRunning,
184}
185
186impl fmt::Display for State {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 match self {
189 Self::Connecting => write!(f, "connecting"),
190 Self::Connected { .. } => write!(f, "connected"),
191 Self::Reconnecting => write!(f, "reconnecting"),
192 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
193 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
194 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
195 Self::ServerNotRunning { .. } => write!(f, "server not running"),
196 }
197 }
198}
199
200impl State {
201 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
202 match self {
203 Self::Connected {
204 remote_connection, ..
205 } => Some(remote_connection.clone()),
206 Self::HeartbeatMissed {
207 remote_connection, ..
208 } => Some(remote_connection.clone()),
209 Self::ReconnectFailed {
210 remote_connection, ..
211 } => Some(remote_connection.clone()),
212 _ => None,
213 }
214 }
215
216 fn can_reconnect(&self) -> bool {
217 match self {
218 Self::Connected { .. }
219 | Self::HeartbeatMissed { .. }
220 | Self::ReconnectFailed { .. } => true,
221 State::Connecting
222 | State::Reconnecting
223 | State::ReconnectExhausted
224 | State::ServerNotRunning => false,
225 }
226 }
227
228 fn is_reconnect_failed(&self) -> bool {
229 matches!(self, Self::ReconnectFailed { .. })
230 }
231
232 fn is_reconnect_exhausted(&self) -> bool {
233 matches!(self, Self::ReconnectExhausted { .. })
234 }
235
236 fn is_server_not_running(&self) -> bool {
237 matches!(self, Self::ServerNotRunning)
238 }
239
240 fn is_reconnecting(&self) -> bool {
241 matches!(self, Self::Reconnecting { .. })
242 }
243
244 fn heartbeat_recovered(self) -> Self {
245 match self {
246 Self::HeartbeatMissed {
247 remote_connection,
248 delegate,
249 multiplex_task,
250 heartbeat_task,
251 ..
252 } => Self::Connected {
253 remote_connection,
254 delegate,
255 multiplex_task,
256 heartbeat_task,
257 },
258 _ => self,
259 }
260 }
261
262 fn heartbeat_missed(self) -> Self {
263 match self {
264 Self::Connected {
265 remote_connection,
266 delegate,
267 multiplex_task,
268 heartbeat_task,
269 } => Self::HeartbeatMissed {
270 missed_heartbeats: 1,
271 remote_connection,
272 delegate,
273 multiplex_task,
274 heartbeat_task,
275 },
276 Self::HeartbeatMissed {
277 missed_heartbeats,
278 remote_connection,
279 delegate,
280 multiplex_task,
281 heartbeat_task,
282 } => Self::HeartbeatMissed {
283 missed_heartbeats: missed_heartbeats + 1,
284 remote_connection,
285 delegate,
286 multiplex_task,
287 heartbeat_task,
288 },
289 _ => self,
290 }
291 }
292}
293
294/// The state of the ssh connection.
295#[derive(Clone, Copy, Debug, PartialEq, Eq)]
296pub enum ConnectionState {
297 Connecting,
298 Connected,
299 HeartbeatMissed,
300 Reconnecting,
301 Disconnected,
302}
303
304impl From<&State> for ConnectionState {
305 fn from(value: &State) -> Self {
306 match value {
307 State::Connecting => Self::Connecting,
308 State::Connected { .. } => Self::Connected,
309 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
310 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
311 State::ReconnectExhausted => Self::Disconnected,
312 State::ServerNotRunning => Self::Disconnected,
313 }
314 }
315}
316
317pub struct RemoteClient {
318 client: Arc<ChannelClient>,
319 unique_identifier: String,
320 connection_options: RemoteConnectionOptions,
321 path_style: PathStyle,
322 state: Option<State>,
323}
324
325#[derive(Debug)]
326pub enum RemoteClientEvent {
327 Disconnected,
328}
329
330impl EventEmitter<RemoteClientEvent> for RemoteClient {}
331
332/// Identifies the socket on the remote server so that reconnects
333/// can re-join the same project.
334pub enum ConnectionIdentifier {
335 Setup(u64),
336 Workspace(i64),
337}
338
339static NEXT_ID: AtomicU64 = AtomicU64::new(1);
340
341impl ConnectionIdentifier {
342 pub fn setup() -> Self {
343 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
344 }
345
346 // This string gets used in a socket name, and so must be relatively short.
347 // The total length of:
348 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
349 // Must be less than about 100 characters
350 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
351 // So our strings should be at most 20 characters or so.
352 fn to_string(&self, cx: &App) -> String {
353 let identifier_prefix = match ReleaseChannel::global(cx) {
354 ReleaseChannel::Stable => "".to_string(),
355 release_channel => format!("{}-", release_channel.dev_name()),
356 };
357 match self {
358 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
359 Self::Workspace(workspace_id) => {
360 format!("{identifier_prefix}workspace-{workspace_id}",)
361 }
362 }
363 }
364}
365
366pub async fn connect(
367 connection_options: RemoteConnectionOptions,
368 delegate: Arc<dyn RemoteClientDelegate>,
369 cx: &mut AsyncApp,
370) -> Result<Arc<dyn RemoteConnection>> {
371 cx.update(|cx| {
372 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
373 pool.connect(connection_options.clone(), delegate.clone(), cx)
374 })
375 })
376 .await
377 .map_err(|e| e.cloned())
378}
379
380impl RemoteClient {
381 pub fn new(
382 unique_identifier: ConnectionIdentifier,
383 remote_connection: Arc<dyn RemoteConnection>,
384 cancellation: oneshot::Receiver<()>,
385 delegate: Arc<dyn RemoteClientDelegate>,
386 cx: &mut App,
387 ) -> Task<Result<Option<Entity<Self>>>> {
388 let unique_identifier = unique_identifier.to_string(cx);
389 cx.spawn(async move |cx| {
390 let success = Box::pin(async move {
391 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
392 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
393 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
394
395 let client = cx.update(|cx| {
396 ChannelClient::new(
397 incoming_rx,
398 outgoing_tx,
399 cx,
400 "client",
401 remote_connection.has_wsl_interop(),
402 )
403 });
404
405 let path_style = remote_connection.path_style();
406 let this = cx.new(|_| Self {
407 client: client.clone(),
408 unique_identifier: unique_identifier.clone(),
409 connection_options: remote_connection.connection_options(),
410 path_style,
411 state: Some(State::Connecting),
412 });
413
414 let io_task = remote_connection.start_proxy(
415 unique_identifier,
416 false,
417 incoming_tx,
418 outgoing_rx,
419 connection_activity_tx,
420 delegate.clone(),
421 cx,
422 );
423
424 let ready = client
425 .wait_for_remote_started()
426 .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
427 .await;
428 match ready {
429 Ok(Some(_)) => {}
430 Ok(None) => {
431 let mut error = "remote client exited before becoming ready".to_owned();
432 if let Some(status) = io_task.now_or_never() {
433 match status {
434 Ok(exit_code) => {
435 error.push_str(&format!(", exit_code={exit_code:?}"))
436 }
437 Err(e) => error.push_str(&format!(", error={e:?}")),
438 }
439 }
440 let error = anyhow::anyhow!("{error}");
441 log::error!("failed to establish connection: {}", error);
442 return Err(error);
443 }
444 Err(_) => {
445 let mut error =
446 "remote client did not become ready within the timeout".to_owned();
447 if let Some(status) = io_task.now_or_never() {
448 match status {
449 Ok(exit_code) => {
450 error.push_str(&format!(", exit_code={exit_code:?}"))
451 }
452 Err(e) => error.push_str(&format!(", error={e:?}")),
453 }
454 }
455 let error = anyhow::anyhow!("{error}");
456 log::error!("failed to establish connection: {}", error);
457 return Err(error);
458 }
459 }
460 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
461 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
462 log::error!("failed to establish connection: {}", error);
463 return Err(error);
464 }
465
466 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
467
468 this.update(cx, |this, _| {
469 this.state = Some(State::Connected {
470 remote_connection,
471 delegate,
472 multiplex_task,
473 heartbeat_task,
474 });
475 });
476
477 Ok(Some(this))
478 });
479
480 select! {
481 _ = cancellation.fuse() => {
482 Ok(None)
483 }
484 result = success.fuse() => result
485 }
486 })
487 }
488
489 pub fn proto_client_from_channels(
490 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
491 outgoing_tx: mpsc::UnboundedSender<Envelope>,
492 cx: &App,
493 name: &'static str,
494 has_wsl_interop: bool,
495 ) -> AnyProtoClient {
496 ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
497 }
498
499 pub fn shutdown_processes<T: RequestMessage>(
500 &mut self,
501 shutdown_request: Option<T>,
502 executor: BackgroundExecutor,
503 ) -> Option<impl Future<Output = ()> + use<T>> {
504 let state = self.state.take()?;
505 log::info!("shutting down remote processes");
506
507 let State::Connected {
508 multiplex_task,
509 heartbeat_task,
510 remote_connection,
511 delegate,
512 } = state
513 else {
514 return None;
515 };
516
517 let client = self.client.clone();
518
519 Some(async move {
520 if let Some(shutdown_request) = shutdown_request {
521 client.send(shutdown_request).log_err();
522 // We wait 50ms instead of waiting for a response, because
523 // waiting for a response would require us to wait on the main thread
524 // which we want to avoid in an `on_app_quit` callback.
525 executor.timer(Duration::from_millis(50)).await;
526 }
527
528 // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
529 // child of master_process.
530 drop(multiplex_task);
531 // Now drop the rest of state, which kills master process.
532 drop(heartbeat_task);
533 drop(remote_connection);
534 drop(delegate);
535 })
536 }
537
538 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
539 let can_reconnect = self
540 .state
541 .as_ref()
542 .map(|state| state.can_reconnect())
543 .unwrap_or(false);
544 if !can_reconnect {
545 let state = if let Some(state) = self.state.as_ref() {
546 state.to_string()
547 } else {
548 "no state set".to_string()
549 };
550 log::info!(
551 "aborting reconnect, because not in state that allows reconnecting: {state}"
552 );
553 anyhow::bail!(
554 "aborting reconnect, because not in state that allows reconnecting: {state}"
555 );
556 }
557
558 let state = self.state.take().unwrap();
559 let (attempts, remote_connection, delegate) = match state {
560 State::Connected {
561 remote_connection,
562 delegate,
563 multiplex_task,
564 heartbeat_task,
565 }
566 | State::HeartbeatMissed {
567 remote_connection,
568 delegate,
569 multiplex_task,
570 heartbeat_task,
571 ..
572 } => {
573 drop(multiplex_task);
574 drop(heartbeat_task);
575 (0, remote_connection, delegate)
576 }
577 State::ReconnectFailed {
578 attempts,
579 remote_connection,
580 delegate,
581 ..
582 } => (attempts, remote_connection, delegate),
583 State::Connecting
584 | State::Reconnecting
585 | State::ReconnectExhausted
586 | State::ServerNotRunning => unreachable!(),
587 };
588
589 let attempts = attempts + 1;
590 if attempts > MAX_RECONNECT_ATTEMPTS {
591 log::error!(
592 "Failed to reconnect to after {} attempts, giving up",
593 MAX_RECONNECT_ATTEMPTS
594 );
595 self.set_state(State::ReconnectExhausted, cx);
596 return Ok(());
597 }
598
599 self.set_state(State::Reconnecting, cx);
600
601 log::info!(
602 "Trying to reconnect to remote server... Attempt {}",
603 attempts
604 );
605
606 let unique_identifier = self.unique_identifier.clone();
607 let client = self.client.clone();
608 let reconnect_task = cx.spawn(async move |this, cx| {
609 macro_rules! failed {
610 ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
611 delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
612 return State::ReconnectFailed {
613 error: anyhow!($error),
614 attempts: $attempts,
615 remote_connection: $remote_connection,
616 delegate: $delegate,
617 };
618 };
619 }
620
621 if let Err(error) = remote_connection
622 .kill()
623 .await
624 .context("Failed to kill remote_connection process")
625 {
626 failed!(error, attempts, remote_connection, delegate);
627 };
628
629 let connection_options = remote_connection.connection_options();
630
631 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
632 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
633 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
634
635 let (remote_connection, io_task) = match async {
636 let remote_connection = cx
637 .update_global(|pool: &mut ConnectionPool, cx| {
638 pool.connect(connection_options, delegate.clone(), cx)
639 })
640 .await
641 .map_err(|error| error.cloned())?;
642
643 let io_task = remote_connection.start_proxy(
644 unique_identifier,
645 true,
646 incoming_tx,
647 outgoing_rx,
648 connection_activity_tx,
649 delegate.clone(),
650 cx,
651 );
652 anyhow::Ok((remote_connection, io_task))
653 }
654 .await
655 {
656 Ok((remote_connection, io_task)) => (remote_connection, io_task),
657 Err(error) => {
658 failed!(error, attempts, remote_connection, delegate);
659 }
660 };
661
662 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
663 client.reconnect(incoming_rx, outgoing_tx, cx);
664
665 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
666 failed!(error, attempts, remote_connection, delegate);
667 };
668
669 State::Connected {
670 remote_connection,
671 delegate,
672 multiplex_task,
673 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
674 }
675 });
676
677 cx.spawn(async move |this, cx| {
678 let new_state = reconnect_task.await;
679 this.update(cx, |this, cx| {
680 this.try_set_state(cx, |old_state| {
681 if old_state.is_reconnecting() {
682 match &new_state {
683 State::Connecting
684 | State::Reconnecting
685 | State::HeartbeatMissed { .. }
686 | State::ServerNotRunning => {}
687 State::Connected { .. } => {
688 log::info!("Successfully reconnected");
689 }
690 State::ReconnectFailed {
691 error, attempts, ..
692 } => {
693 log::error!(
694 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
695 attempts,
696 error
697 );
698 }
699 State::ReconnectExhausted => {
700 log::error!("Reconnect attempt failed and all attempts exhausted");
701 }
702 }
703 Some(new_state)
704 } else {
705 None
706 }
707 });
708
709 if this.state_is(State::is_reconnect_failed) {
710 this.reconnect(cx)
711 } else if this.state_is(State::is_reconnect_exhausted) {
712 Ok(())
713 } else {
714 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
715 Ok(())
716 }
717 })
718 })
719 .detach_and_log_err(cx);
720
721 Ok(())
722 }
723
724 fn heartbeat(
725 this: WeakEntity<Self>,
726 mut connection_activity_rx: mpsc::Receiver<()>,
727 cx: &mut AsyncApp,
728 ) -> Task<Result<()>> {
729 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
730 return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
731 };
732
733 cx.spawn(async move |cx| {
734 let mut missed_heartbeats = 0;
735
736 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
737 futures::pin_mut!(keepalive_timer);
738
739 loop {
740 select_biased! {
741 result = connection_activity_rx.next().fuse() => {
742 if result.is_none() {
743 log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
744 return Ok(());
745 }
746
747 if missed_heartbeats != 0 {
748 missed_heartbeats = 0;
749 let _ =this.update(cx, |this, cx| {
750 this.handle_heartbeat_result(missed_heartbeats, cx)
751 })?;
752 }
753 }
754 _ = keepalive_timer => {
755 log::debug!("Sending heartbeat to server...");
756
757 let result = select_biased! {
758 _ = connection_activity_rx.next().fuse() => {
759 Ok(())
760 }
761 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
762 ping_result
763 }
764 };
765
766 if result.is_err() {
767 missed_heartbeats += 1;
768 log::warn!(
769 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
770 HEARTBEAT_TIMEOUT,
771 missed_heartbeats,
772 MAX_MISSED_HEARTBEATS
773 );
774 } else if missed_heartbeats != 0 {
775 missed_heartbeats = 0;
776 } else {
777 continue;
778 }
779
780 let result = this.update(cx, |this, cx| {
781 this.handle_heartbeat_result(missed_heartbeats, cx)
782 })?;
783 if result.is_break() {
784 return Ok(());
785 }
786 }
787 }
788
789 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
790 }
791 })
792 }
793
794 fn handle_heartbeat_result(
795 &mut self,
796 missed_heartbeats: usize,
797 cx: &mut Context<Self>,
798 ) -> ControlFlow<()> {
799 let state = self.state.take().unwrap();
800 let next_state = if missed_heartbeats > 0 {
801 state.heartbeat_missed()
802 } else {
803 state.heartbeat_recovered()
804 };
805
806 self.set_state(next_state, cx);
807
808 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
809 log::error!(
810 "Missed last {} heartbeats. Reconnecting...",
811 missed_heartbeats
812 );
813
814 self.reconnect(cx)
815 .context("failed to start reconnect process after missing heartbeats")
816 .log_err();
817 ControlFlow::Break(())
818 } else {
819 ControlFlow::Continue(())
820 }
821 }
822
823 fn monitor(
824 this: WeakEntity<Self>,
825 io_task: Task<Result<i32>>,
826 cx: &AsyncApp,
827 ) -> Task<Result<()>> {
828 cx.spawn(async move |cx| {
829 let result = io_task.await;
830
831 match result {
832 Ok(exit_code) => {
833 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
834 match error {
835 ProxyLaunchError::ServerNotRunning => {
836 log::error!("failed to reconnect because server is not running");
837 this.update(cx, |this, cx| {
838 this.set_state(State::ServerNotRunning, cx);
839 })?;
840 }
841 }
842 } else if exit_code > 0 {
843 log::error!("proxy process terminated unexpectedly");
844 this.update(cx, |this, cx| {
845 this.reconnect(cx).ok();
846 })?;
847 }
848 }
849 Err(error) => {
850 log::warn!(
851 "remote io task died with error: {:?}. reconnecting...",
852 error
853 );
854 this.update(cx, |this, cx| {
855 this.reconnect(cx).ok();
856 })?;
857 }
858 }
859
860 Ok(())
861 })
862 }
863
864 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
865 self.state.as_ref().is_some_and(check)
866 }
867
868 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
869 let new_state = self.state.as_ref().and_then(map);
870 if let Some(new_state) = new_state {
871 self.state.replace(new_state);
872 cx.notify();
873 }
874 }
875
876 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
877 log::info!("setting state to '{}'", &state);
878
879 let is_reconnect_exhausted = state.is_reconnect_exhausted();
880 let is_server_not_running = state.is_server_not_running();
881 self.state.replace(state);
882
883 if is_reconnect_exhausted || is_server_not_running {
884 cx.emit(RemoteClientEvent::Disconnected);
885 }
886 cx.notify();
887 }
888
889 pub fn shell(&self) -> Option<String> {
890 Some(self.remote_connection()?.shell())
891 }
892
893 pub fn default_system_shell(&self) -> Option<String> {
894 Some(self.remote_connection()?.default_system_shell())
895 }
896
897 pub fn shares_network_interface(&self) -> bool {
898 self.remote_connection()
899 .map_or(false, |connection| connection.shares_network_interface())
900 }
901
902 pub fn has_wsl_interop(&self) -> bool {
903 self.remote_connection()
904 .map_or(false, |connection| connection.has_wsl_interop())
905 }
906
907 pub fn build_command(
908 &self,
909 program: Option<String>,
910 args: &[String],
911 env: &HashMap<String, String>,
912 working_dir: Option<String>,
913 port_forward: Option<(u16, String, u16)>,
914 ) -> Result<CommandTemplate> {
915 self.build_command_with_options(
916 program,
917 args,
918 env,
919 working_dir,
920 port_forward,
921 Interactive::Yes,
922 )
923 }
924
925 pub fn build_command_with_options(
926 &self,
927 program: Option<String>,
928 args: &[String],
929 env: &HashMap<String, String>,
930 working_dir: Option<String>,
931 port_forward: Option<(u16, String, u16)>,
932 interactive: Interactive,
933 ) -> Result<CommandTemplate> {
934 let Some(connection) = self.remote_connection() else {
935 return Err(anyhow!("no remote connection"));
936 };
937 connection.build_command(program, args, env, working_dir, port_forward, interactive)
938 }
939
940 pub fn build_forward_ports_command(
941 &self,
942 forwards: Vec<(u16, String, u16)>,
943 ) -> Result<CommandTemplate> {
944 let Some(connection) = self.remote_connection() else {
945 return Err(anyhow!("no remote connection"));
946 };
947 connection.build_forward_ports_command(forwards)
948 }
949
950 pub fn upload_directory(
951 &self,
952 src_path: PathBuf,
953 dest_path: RemotePathBuf,
954 cx: &App,
955 ) -> Task<Result<()>> {
956 let Some(connection) = self.remote_connection() else {
957 return Task::ready(Err(anyhow!("no remote connection")));
958 };
959 connection.upload_directory(src_path, dest_path, cx)
960 }
961
962 pub fn proto_client(&self) -> AnyProtoClient {
963 self.client.clone().into()
964 }
965
966 pub fn connection_options(&self) -> RemoteConnectionOptions {
967 self.connection_options.clone()
968 }
969
970 pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
971 if let State::Connected {
972 remote_connection, ..
973 } = self.state.as_ref()?
974 {
975 Some(remote_connection.clone())
976 } else {
977 None
978 }
979 }
980
981 pub fn connection_state(&self) -> ConnectionState {
982 self.state
983 .as_ref()
984 .map(ConnectionState::from)
985 .unwrap_or(ConnectionState::Disconnected)
986 }
987
988 pub fn is_disconnected(&self) -> bool {
989 self.connection_state() == ConnectionState::Disconnected
990 }
991
992 pub fn path_style(&self) -> PathStyle {
993 self.path_style
994 }
995
996 /// Forcibly disconnects from the remote server by killing the underlying connection.
997 /// This will trigger the reconnection logic if reconnection attempts remain.
998 /// Useful for testing reconnection behavior in real environments.
999 pub fn force_disconnect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1000 let Some(connection) = self.remote_connection() else {
1001 return Task::ready(Err(anyhow!("no active remote connection to disconnect")));
1002 };
1003
1004 log::info!("force_disconnect: killing remote connection");
1005
1006 cx.spawn(async move |_, _| {
1007 connection.kill().await?;
1008 Ok(())
1009 })
1010 }
1011
1012 /// Simulates a timeout by pausing heartbeat responses.
1013 /// This will cause heartbeat failures and eventually trigger reconnection
1014 /// after MAX_MISSED_HEARTBEATS are missed.
1015 /// Useful for testing timeout behavior in real environments.
1016 pub fn force_heartbeat_timeout(&mut self, attempts: usize, cx: &mut Context<Self>) {
1017 log::info!("force_heartbeat_timeout: triggering heartbeat failure state");
1018
1019 if let Some(State::Connected {
1020 remote_connection,
1021 delegate,
1022 multiplex_task,
1023 heartbeat_task,
1024 }) = self.state.take()
1025 {
1026 self.set_state(
1027 if attempts == 0 {
1028 State::HeartbeatMissed {
1029 missed_heartbeats: MAX_MISSED_HEARTBEATS,
1030 remote_connection,
1031 delegate,
1032 multiplex_task,
1033 heartbeat_task,
1034 }
1035 } else {
1036 State::ReconnectFailed {
1037 remote_connection,
1038 delegate,
1039 error: anyhow!("forced heartbeat timeout"),
1040 attempts,
1041 }
1042 },
1043 cx,
1044 );
1045
1046 self.reconnect(cx)
1047 .context("failed to start reconnect after forced timeout")
1048 .log_err();
1049 } else {
1050 log::warn!("force_heartbeat_timeout: not in Connected state, ignoring");
1051 }
1052 }
1053
1054 #[cfg(any(test, feature = "test-support"))]
1055 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
1056 let opts = self.connection_options();
1057 client_cx.spawn(async move |cx| {
1058 let connection = cx.update_global(|c: &mut ConnectionPool, _| {
1059 if let Some(ConnectionPoolEntry::Connected(c)) = c.connections.get(&opts) {
1060 if let Some(connection) = c.upgrade() {
1061 connection
1062 } else {
1063 panic!("connection was dropped")
1064 }
1065 } else {
1066 panic!("missing test connection")
1067 }
1068 });
1069
1070 connection.simulate_disconnect(cx);
1071 })
1072 }
1073
1074 /// Creates a mock connection pair for testing.
1075 ///
1076 /// This is the recommended way to create mock remote connections for tests.
1077 /// It returns the `MockConnectionOptions` (which can be passed to create a
1078 /// `HeadlessProject`), an `AnyProtoClient` for the server side and a
1079 /// `ConnectGuard` for the client side which blocks the connection from
1080 /// being established until dropped.
1081 ///
1082 /// # Example
1083 /// ```ignore
1084 /// let (opts, server_session, connect_guard) = RemoteClient::fake_server(cx, server_cx);
1085 /// // Set up HeadlessProject with server_session...
1086 /// drop(connect_guard);
1087 /// let client = RemoteClient::fake_client(opts, cx).await;
1088 /// ```
1089 #[cfg(any(test, feature = "test-support"))]
1090 pub fn fake_server(
1091 client_cx: &mut gpui::TestAppContext,
1092 server_cx: &mut gpui::TestAppContext,
1093 ) -> (RemoteConnectionOptions, AnyProtoClient, ConnectGuard) {
1094 use crate::transport::mock::MockConnection;
1095 let (opts, server_client, connect_guard) = MockConnection::new(client_cx, server_cx);
1096 (opts.into(), server_client, connect_guard)
1097 }
1098
1099 /// Creates a `RemoteClient` connected to a mock server.
1100 ///
1101 /// Call `fake_server` first to get the connection options, set up the
1102 /// `HeadlessProject` with the server session, then call this method
1103 /// to create the client.
1104 #[cfg(any(test, feature = "test-support"))]
1105 pub async fn connect_mock(
1106 opts: RemoteConnectionOptions,
1107 client_cx: &mut gpui::TestAppContext,
1108 ) -> Entity<Self> {
1109 assert!(matches!(opts, RemoteConnectionOptions::Mock(..)));
1110 use crate::transport::mock::MockDelegate;
1111 let (_tx, rx) = oneshot::channel();
1112 let mut cx = client_cx.to_async();
1113 let connection = connect(opts, Arc::new(MockDelegate), &mut cx)
1114 .await
1115 .unwrap();
1116 client_cx
1117 .update(|cx| {
1118 Self::new(
1119 ConnectionIdentifier::setup(),
1120 connection,
1121 rx,
1122 Arc::new(MockDelegate),
1123 cx,
1124 )
1125 })
1126 .await
1127 .unwrap()
1128 .unwrap()
1129 }
1130
1131 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1132 self.state
1133 .as_ref()
1134 .and_then(|state| state.remote_connection())
1135 }
1136}
1137
1138enum ConnectionPoolEntry {
1139 Connecting(WeakShared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1140 Connected(Weak<dyn RemoteConnection>),
1141}
1142
1143#[derive(Default)]
1144struct ConnectionPool {
1145 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1146}
1147
1148impl Global for ConnectionPool {}
1149
1150impl ConnectionPool {
1151 fn connect(
1152 &mut self,
1153 opts: RemoteConnectionOptions,
1154 delegate: Arc<dyn RemoteClientDelegate>,
1155 cx: &mut App,
1156 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1157 let connection = self.connections.get(&opts);
1158 match connection {
1159 Some(ConnectionPoolEntry::Connecting(task)) => {
1160 if let Some(task) = task.upgrade() {
1161 log::debug!("Connecting task is still alive");
1162 cx.spawn(async move |cx| {
1163 delegate.set_status(Some("Waiting for existing connection attempt"), cx)
1164 })
1165 .detach();
1166 return task;
1167 }
1168 log::debug!("Connecting task is dead, removing it and restarting a connection");
1169 self.connections.remove(&opts);
1170 }
1171 Some(ConnectionPoolEntry::Connected(remote)) => {
1172 if let Some(remote) = remote.upgrade()
1173 && !remote.has_been_killed()
1174 {
1175 log::debug!("Connection is still alive");
1176 return Task::ready(Ok(remote)).shared();
1177 }
1178 log::debug!("Connection is dead, removing it and restarting a connection");
1179 self.connections.remove(&opts);
1180 }
1181 None => {
1182 log::debug!("No existing connection found, starting a new one");
1183 }
1184 }
1185
1186 let task = cx
1187 .spawn({
1188 let opts = opts.clone();
1189 let delegate = delegate.clone();
1190 async move |cx| {
1191 let connection = match opts.clone() {
1192 RemoteConnectionOptions::Ssh(opts) => {
1193 SshRemoteConnection::new(opts, delegate, cx)
1194 .await
1195 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1196 }
1197 RemoteConnectionOptions::Wsl(opts) => {
1198 WslRemoteConnection::new(opts, delegate, cx)
1199 .await
1200 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1201 }
1202 RemoteConnectionOptions::Docker(opts) => {
1203 DockerExecConnection::new(opts, delegate, cx)
1204 .await
1205 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1206 }
1207 #[cfg(any(test, feature = "test-support"))]
1208 RemoteConnectionOptions::Mock(opts) => match cx.update(|cx| {
1209 cx.default_global::<crate::transport::mock::MockConnectionRegistry>()
1210 .take(&opts)
1211 }) {
1212 Some(connection) => Ok(connection.await as Arc<dyn RemoteConnection>),
1213 None => Err(anyhow!(
1214 "Mock connection not found. Call MockConnection::new() first."
1215 )),
1216 },
1217 };
1218
1219 cx.update_global(|pool: &mut Self, _| {
1220 debug_assert!(matches!(
1221 pool.connections.get(&opts),
1222 Some(ConnectionPoolEntry::Connecting(_))
1223 ));
1224 match connection {
1225 Ok(connection) => {
1226 pool.connections.insert(
1227 opts.clone(),
1228 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1229 );
1230 Ok(connection)
1231 }
1232 Err(error) => {
1233 pool.connections.remove(&opts);
1234 Err(Arc::new(error))
1235 }
1236 }
1237 })
1238 }
1239 })
1240 .shared();
1241 if let Some(task) = task.downgrade() {
1242 self.connections
1243 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task));
1244 }
1245 task
1246 }
1247}
1248
1249#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1250pub enum RemoteConnectionOptions {
1251 Ssh(SshConnectionOptions),
1252 Wsl(WslConnectionOptions),
1253 Docker(DockerConnectionOptions),
1254 #[cfg(any(test, feature = "test-support"))]
1255 Mock(crate::transport::mock::MockConnectionOptions),
1256}
1257
1258impl RemoteConnectionOptions {
1259 pub fn display_name(&self) -> String {
1260 match self {
1261 RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1262 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1263 RemoteConnectionOptions::Docker(opts) => {
1264 if opts.use_podman {
1265 format!("[podman] {}", opts.name)
1266 } else {
1267 opts.name.clone()
1268 }
1269 }
1270 #[cfg(any(test, feature = "test-support"))]
1271 RemoteConnectionOptions::Mock(opts) => format!("mock-{}", opts.id),
1272 }
1273 }
1274}
1275
1276impl From<SshConnectionOptions> for RemoteConnectionOptions {
1277 fn from(opts: SshConnectionOptions) -> Self {
1278 RemoteConnectionOptions::Ssh(opts)
1279 }
1280}
1281
1282impl From<WslConnectionOptions> for RemoteConnectionOptions {
1283 fn from(opts: WslConnectionOptions) -> Self {
1284 RemoteConnectionOptions::Wsl(opts)
1285 }
1286}
1287
1288#[cfg(any(test, feature = "test-support"))]
1289impl From<crate::transport::mock::MockConnectionOptions> for RemoteConnectionOptions {
1290 fn from(opts: crate::transport::mock::MockConnectionOptions) -> Self {
1291 RemoteConnectionOptions::Mock(opts)
1292 }
1293}
1294
1295#[cfg(target_os = "windows")]
1296/// Open a wsl path (\\wsl.localhost\<distro>\path)
1297#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1298#[action(namespace = workspace, no_json, no_register)]
1299pub struct OpenWslPath {
1300 pub distro: WslConnectionOptions,
1301 pub paths: Vec<PathBuf>,
1302}
1303
1304#[async_trait(?Send)]
1305pub trait RemoteConnection: Send + Sync {
1306 fn start_proxy(
1307 &self,
1308 unique_identifier: String,
1309 reconnect: bool,
1310 incoming_tx: UnboundedSender<Envelope>,
1311 outgoing_rx: UnboundedReceiver<Envelope>,
1312 connection_activity_tx: Sender<()>,
1313 delegate: Arc<dyn RemoteClientDelegate>,
1314 cx: &mut AsyncApp,
1315 ) -> Task<Result<i32>>;
1316 fn upload_directory(
1317 &self,
1318 src_path: PathBuf,
1319 dest_path: RemotePathBuf,
1320 cx: &App,
1321 ) -> Task<Result<()>>;
1322 async fn kill(&self) -> Result<()>;
1323 fn has_been_killed(&self) -> bool;
1324 fn shares_network_interface(&self) -> bool {
1325 false
1326 }
1327 fn build_command(
1328 &self,
1329 program: Option<String>,
1330 args: &[String],
1331 env: &HashMap<String, String>,
1332 working_dir: Option<String>,
1333 port_forward: Option<(u16, String, u16)>,
1334 interactive: Interactive,
1335 ) -> Result<CommandTemplate>;
1336 fn build_forward_ports_command(
1337 &self,
1338 forwards: Vec<(u16, String, u16)>,
1339 ) -> Result<CommandTemplate>;
1340 fn connection_options(&self) -> RemoteConnectionOptions;
1341 fn path_style(&self) -> PathStyle;
1342 fn shell(&self) -> String;
1343 fn default_system_shell(&self) -> String;
1344 fn has_wsl_interop(&self) -> bool;
1345
1346 #[cfg(any(test, feature = "test-support"))]
1347 fn simulate_disconnect(&self, _: &AsyncApp) {}
1348}
1349
1350type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1351
1352struct Signal<T> {
1353 tx: Mutex<Option<oneshot::Sender<T>>>,
1354 rx: Shared<Task<Option<T>>>,
1355}
1356
1357impl<T: Send + Clone + 'static> Signal<T> {
1358 pub fn new(cx: &App) -> Self {
1359 let (tx, rx) = oneshot::channel();
1360
1361 let task = cx
1362 .background_executor()
1363 .spawn(async move { rx.await.ok() })
1364 .shared();
1365
1366 Self {
1367 tx: Mutex::new(Some(tx)),
1368 rx: task,
1369 }
1370 }
1371
1372 fn set(&self, value: T) {
1373 if let Some(tx) = self.tx.lock().take() {
1374 let _ = tx.send(value);
1375 }
1376 }
1377
1378 fn wait(&self) -> Shared<Task<Option<T>>> {
1379 self.rx.clone()
1380 }
1381}
1382
1383pub(crate) struct ChannelClient {
1384 next_message_id: AtomicU32,
1385 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1386 buffer: Mutex<VecDeque<Envelope>>,
1387 response_channels: ResponseChannels,
1388 message_handlers: Mutex<ProtoMessageHandlerSet>,
1389 max_received: AtomicU32,
1390 name: &'static str,
1391 task: Mutex<Task<Result<()>>>,
1392 remote_started: Signal<()>,
1393 has_wsl_interop: bool,
1394 executor: BackgroundExecutor,
1395}
1396
1397impl ChannelClient {
1398 pub(crate) fn new(
1399 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1400 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1401 cx: &App,
1402 name: &'static str,
1403 has_wsl_interop: bool,
1404 ) -> Arc<Self> {
1405 Arc::new_cyclic(|this| Self {
1406 outgoing_tx: Mutex::new(outgoing_tx),
1407 next_message_id: AtomicU32::new(0),
1408 max_received: AtomicU32::new(0),
1409 response_channels: ResponseChannels::default(),
1410 message_handlers: Default::default(),
1411 buffer: Mutex::new(VecDeque::new()),
1412 name,
1413 executor: cx.background_executor().clone(),
1414 task: Mutex::new(Self::start_handling_messages(
1415 this.clone(),
1416 incoming_rx,
1417 &cx.to_async(),
1418 )),
1419 remote_started: Signal::new(cx),
1420 has_wsl_interop,
1421 })
1422 }
1423
1424 fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1425 self.remote_started.wait()
1426 }
1427
1428 fn start_handling_messages(
1429 this: Weak<Self>,
1430 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1431 cx: &AsyncApp,
1432 ) -> Task<Result<()>> {
1433 cx.spawn(async move |cx| {
1434 if let Some(this) = this.upgrade() {
1435 let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1436 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1437 };
1438
1439 let peer_id = PeerId { owner_id: 0, id: 0 };
1440 while let Some(incoming) = incoming_rx.next().await {
1441 let Some(this) = this.upgrade() else {
1442 return anyhow::Ok(());
1443 };
1444 if let Some(ack_id) = incoming.ack_id {
1445 let mut buffer = this.buffer.lock();
1446 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1447 buffer.pop_front();
1448 }
1449 }
1450 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1451 {
1452 log::debug!(
1453 "{}:remote message received. name:FlushBufferedMessages",
1454 this.name
1455 );
1456 {
1457 let buffer = this.buffer.lock();
1458 for envelope in buffer.iter() {
1459 this.outgoing_tx
1460 .lock()
1461 .unbounded_send(envelope.clone())
1462 .ok();
1463 }
1464 }
1465 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1466 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1467 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1468 continue;
1469 }
1470
1471 if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1472 this.remote_started.set(());
1473 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1474 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1475 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1476 continue;
1477 }
1478
1479 this.max_received.store(incoming.id, SeqCst);
1480
1481 if let Some(request_id) = incoming.responding_to {
1482 let request_id = MessageId(request_id);
1483 let sender = this.response_channels.lock().remove(&request_id);
1484 if let Some(sender) = sender {
1485 let (tx, rx) = oneshot::channel();
1486 if incoming.payload.is_some() {
1487 sender.send((incoming, tx)).ok();
1488 }
1489 rx.await.ok();
1490 }
1491 } else if let Some(envelope) =
1492 build_typed_envelope(peer_id, Instant::now(), incoming)
1493 {
1494 let type_name = envelope.payload_type_name();
1495 let message_id = envelope.message_id();
1496 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1497 &this.message_handlers,
1498 envelope,
1499 this.clone().into(),
1500 cx.clone(),
1501 ) {
1502 log::debug!("{}:remote message received. name:{type_name}", this.name);
1503 cx.foreground_executor()
1504 .spawn(async move {
1505 match future.await {
1506 Ok(_) => {
1507 log::debug!(
1508 "{}:remote message handled. name:{type_name}",
1509 this.name
1510 );
1511 }
1512 Err(error) => {
1513 log::error!(
1514 "{}:error handling message. type:{}, error:{:#}",
1515 this.name,
1516 type_name,
1517 format!("{error:#}").lines().fold(
1518 String::new(),
1519 |mut message, line| {
1520 if !message.is_empty() {
1521 message.push(' ');
1522 }
1523 message.push_str(line);
1524 message
1525 }
1526 )
1527 );
1528 }
1529 }
1530 })
1531 .detach()
1532 } else {
1533 log::error!("{}:unhandled remote message name:{type_name}", this.name);
1534 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1535 message_id,
1536 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1537 ) {
1538 log::error!(
1539 "{}:error sending error response for {type_name}:{e:#}",
1540 this.name
1541 );
1542 }
1543 }
1544 }
1545 }
1546 anyhow::Ok(())
1547 })
1548 }
1549
1550 pub(crate) fn reconnect(
1551 self: &Arc<Self>,
1552 incoming_rx: UnboundedReceiver<Envelope>,
1553 outgoing_tx: UnboundedSender<Envelope>,
1554 cx: &AsyncApp,
1555 ) {
1556 *self.outgoing_tx.lock() = outgoing_tx;
1557 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1558 }
1559
1560 fn request<T: RequestMessage>(
1561 &self,
1562 payload: T,
1563 ) -> impl 'static + Future<Output = Result<T::Response>> {
1564 self.request_internal(payload, true)
1565 }
1566
1567 fn request_internal<T: RequestMessage>(
1568 &self,
1569 payload: T,
1570 use_buffer: bool,
1571 ) -> impl 'static + Future<Output = Result<T::Response>> {
1572 log::debug!("remote request start. name:{}", T::NAME);
1573 let response =
1574 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1575 async move {
1576 let response = response.await?;
1577 log::debug!("remote request finish. name:{}", T::NAME);
1578 T::Response::from_envelope(response).context("received a response of the wrong type")
1579 }
1580 }
1581
1582 async fn resync(&self, timeout: Duration) -> Result<()> {
1583 smol::future::or(
1584 async {
1585 self.request_internal(proto::FlushBufferedMessages {}, false)
1586 .await?;
1587
1588 for envelope in self.buffer.lock().iter() {
1589 self.outgoing_tx
1590 .lock()
1591 .unbounded_send(envelope.clone())
1592 .ok();
1593 }
1594 Ok(())
1595 },
1596 async {
1597 self.executor.timer(timeout).await;
1598 anyhow::bail!("Timed out resyncing remote client")
1599 },
1600 )
1601 .await
1602 }
1603
1604 async fn ping(&self, timeout: Duration) -> Result<()> {
1605 smol::future::or(
1606 async {
1607 self.request(proto::Ping {}).await?;
1608 Ok(())
1609 },
1610 async {
1611 self.executor.timer(timeout).await;
1612 anyhow::bail!("Timed out pinging remote client")
1613 },
1614 )
1615 .await
1616 }
1617
1618 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1619 log::debug!("remote send name:{}", T::NAME);
1620 self.send_dynamic(payload.into_envelope(0, None, None))
1621 }
1622
1623 fn request_dynamic(
1624 &self,
1625 mut envelope: proto::Envelope,
1626 type_name: &'static str,
1627 use_buffer: bool,
1628 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1629 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1630 let (tx, rx) = oneshot::channel();
1631 let mut response_channels_lock = self.response_channels.lock();
1632 response_channels_lock.insert(MessageId(envelope.id), tx);
1633 drop(response_channels_lock);
1634
1635 let result = if use_buffer {
1636 self.send_buffered(envelope)
1637 } else {
1638 self.send_unbuffered(envelope)
1639 };
1640 async move {
1641 if let Err(error) = &result {
1642 log::error!("failed to send message: {error}");
1643 anyhow::bail!("failed to send message: {error}");
1644 }
1645
1646 let response = rx.await.context("connection lost")?.0;
1647 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1648 return Err(RpcError::from_proto(error, type_name));
1649 }
1650 Ok(response)
1651 }
1652 }
1653
1654 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1655 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1656 self.send_buffered(envelope)
1657 }
1658
1659 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1660 envelope.ack_id = Some(self.max_received.load(SeqCst));
1661 self.buffer.lock().push_back(envelope.clone());
1662 // ignore errors on send (happen while we're reconnecting)
1663 // assume that the global "disconnected" overlay is sufficient.
1664 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1665 Ok(())
1666 }
1667
1668 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1669 envelope.ack_id = Some(self.max_received.load(SeqCst));
1670 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1671 Ok(())
1672 }
1673}
1674
1675impl ProtoClient for ChannelClient {
1676 fn request(
1677 &self,
1678 envelope: proto::Envelope,
1679 request_type: &'static str,
1680 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1681 self.request_dynamic(envelope, request_type, true).boxed()
1682 }
1683
1684 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1685 self.send_dynamic(envelope)
1686 }
1687
1688 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1689 self.send_dynamic(envelope)
1690 }
1691
1692 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1693 &self.message_handlers
1694 }
1695
1696 fn is_via_collab(&self) -> bool {
1697 false
1698 }
1699
1700 fn has_wsl_interop(&self) -> bool {
1701 self.has_wsl_interop
1702 }
1703}