1#[cfg(any(test, feature = "test-support"))]
2use crate::transport::mock::ConnectGuard;
3use crate::{
4 SshConnectionOptions,
5 protocol::MessageId,
6 proxy::ProxyLaunchError,
7 transport::{
8 docker::{DockerConnectionOptions, DockerExecConnection},
9 ssh::SshRemoteConnection,
10 wsl::{WslConnectionOptions, WslRemoteConnection},
11 },
12};
13use anyhow::{Context as _, Result, anyhow};
14use askpass::EncryptedPassword;
15use async_trait::async_trait;
16use collections::HashMap;
17use futures::{
18 Future, FutureExt as _, StreamExt as _,
19 channel::{
20 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
21 oneshot,
22 },
23 future::{BoxFuture, Shared, WeakShared},
24 select, select_biased,
25};
26use gpui::{
27 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
28 EventEmitter, FutureExt, Global, Task, WeakEntity,
29};
30use parking_lot::Mutex;
31
32use release_channel::ReleaseChannel;
33use rpc::{
34 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
35 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
36};
37use semver::Version;
38use std::{
39 collections::VecDeque,
40 fmt,
41 ops::ControlFlow,
42 path::PathBuf,
43 sync::{
44 Arc, Weak,
45 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
46 },
47 time::{Duration, Instant},
48};
49use util::{
50 ResultExt,
51 paths::{PathStyle, RemotePathBuf},
52};
53
54#[derive(Copy, Clone, Debug, PartialEq, Eq)]
55pub enum RemoteOs {
56 Linux,
57 MacOs,
58 Windows,
59}
60
61impl RemoteOs {
62 pub fn as_str(&self) -> &'static str {
63 match self {
64 RemoteOs::Linux => "linux",
65 RemoteOs::MacOs => "macos",
66 RemoteOs::Windows => "windows",
67 }
68 }
69
70 pub fn is_windows(&self) -> bool {
71 matches!(self, RemoteOs::Windows)
72 }
73}
74
75impl std::fmt::Display for RemoteOs {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.write_str(self.as_str())
78 }
79}
80
81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
82pub enum RemoteArch {
83 X86_64,
84 Aarch64,
85}
86
87impl RemoteArch {
88 pub fn as_str(&self) -> &'static str {
89 match self {
90 RemoteArch::X86_64 => "x86_64",
91 RemoteArch::Aarch64 => "aarch64",
92 }
93 }
94}
95
96impl std::fmt::Display for RemoteArch {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.write_str(self.as_str())
99 }
100}
101
102#[derive(Copy, Clone, Debug)]
103pub struct RemotePlatform {
104 pub os: RemoteOs,
105 pub arch: RemoteArch,
106}
107
108#[derive(Clone, Debug)]
109pub struct CommandTemplate {
110 pub program: String,
111 pub args: Vec<String>,
112 pub env: HashMap<String, String>,
113}
114
115/// Whether a command should be run with TTY allocation for interactive use.
116#[derive(Clone, Copy, Debug, PartialEq, Eq)]
117pub enum Interactive {
118 /// Allocate a pseudo-TTY for interactive terminal use.
119 Yes,
120 /// Do not allocate a TTY - for commands that communicate via piped stdio.
121 No,
122}
123
124pub trait RemoteClientDelegate: Send + Sync {
125 fn ask_password(
126 &self,
127 prompt: String,
128 tx: oneshot::Sender<EncryptedPassword>,
129 cx: &mut AsyncApp,
130 );
131 fn get_download_url(
132 &self,
133 platform: RemotePlatform,
134 release_channel: ReleaseChannel,
135 version: Option<Version>,
136 cx: &mut AsyncApp,
137 ) -> Task<Result<Option<String>>>;
138 fn download_server_binary_locally(
139 &self,
140 platform: RemotePlatform,
141 release_channel: ReleaseChannel,
142 version: Option<Version>,
143 cx: &mut AsyncApp,
144 ) -> Task<Result<PathBuf>>;
145 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
146}
147
148const MAX_MISSED_HEARTBEATS: usize = 5;
149const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
150const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
151const INITIAL_CONNECTION_TIMEOUT: Duration =
152 Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
153
154pub const MAX_RECONNECT_ATTEMPTS: usize = 3;
155
156enum State {
157 Connecting,
158 Connected {
159 remote_connection: Arc<dyn RemoteConnection>,
160 delegate: Arc<dyn RemoteClientDelegate>,
161
162 multiplex_task: Task<Result<()>>,
163 heartbeat_task: Task<Result<()>>,
164 },
165 HeartbeatMissed {
166 missed_heartbeats: usize,
167
168 remote_connection: Arc<dyn RemoteConnection>,
169 delegate: Arc<dyn RemoteClientDelegate>,
170
171 multiplex_task: Task<Result<()>>,
172 heartbeat_task: Task<Result<()>>,
173 },
174 Reconnecting,
175 ReconnectFailed {
176 remote_connection: Arc<dyn RemoteConnection>,
177 delegate: Arc<dyn RemoteClientDelegate>,
178
179 error: anyhow::Error,
180 attempts: usize,
181 },
182 ReconnectExhausted,
183 ServerNotRunning,
184}
185
186impl fmt::Display for State {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 match self {
189 Self::Connecting => write!(f, "connecting"),
190 Self::Connected { .. } => write!(f, "connected"),
191 Self::Reconnecting => write!(f, "reconnecting"),
192 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
193 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
194 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
195 Self::ServerNotRunning { .. } => write!(f, "server not running"),
196 }
197 }
198}
199
200impl State {
201 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
202 match self {
203 Self::Connected {
204 remote_connection, ..
205 } => Some(remote_connection.clone()),
206 Self::HeartbeatMissed {
207 remote_connection, ..
208 } => Some(remote_connection.clone()),
209 Self::ReconnectFailed {
210 remote_connection, ..
211 } => Some(remote_connection.clone()),
212 _ => None,
213 }
214 }
215
216 fn can_reconnect(&self) -> bool {
217 match self {
218 Self::Connected { .. }
219 | Self::HeartbeatMissed { .. }
220 | Self::ReconnectFailed { .. } => true,
221 State::Connecting
222 | State::Reconnecting
223 | State::ReconnectExhausted
224 | State::ServerNotRunning => false,
225 }
226 }
227
228 fn is_reconnect_failed(&self) -> bool {
229 matches!(self, Self::ReconnectFailed { .. })
230 }
231
232 fn is_reconnect_exhausted(&self) -> bool {
233 matches!(self, Self::ReconnectExhausted { .. })
234 }
235
236 fn is_server_not_running(&self) -> bool {
237 matches!(self, Self::ServerNotRunning)
238 }
239
240 fn is_reconnecting(&self) -> bool {
241 matches!(self, Self::Reconnecting { .. })
242 }
243
244 fn heartbeat_recovered(self) -> Self {
245 match self {
246 Self::HeartbeatMissed {
247 remote_connection,
248 delegate,
249 multiplex_task,
250 heartbeat_task,
251 ..
252 } => Self::Connected {
253 remote_connection,
254 delegate,
255 multiplex_task,
256 heartbeat_task,
257 },
258 _ => self,
259 }
260 }
261
262 fn heartbeat_missed(self) -> Self {
263 match self {
264 Self::Connected {
265 remote_connection,
266 delegate,
267 multiplex_task,
268 heartbeat_task,
269 } => Self::HeartbeatMissed {
270 missed_heartbeats: 1,
271 remote_connection,
272 delegate,
273 multiplex_task,
274 heartbeat_task,
275 },
276 Self::HeartbeatMissed {
277 missed_heartbeats,
278 remote_connection,
279 delegate,
280 multiplex_task,
281 heartbeat_task,
282 } => Self::HeartbeatMissed {
283 missed_heartbeats: missed_heartbeats + 1,
284 remote_connection,
285 delegate,
286 multiplex_task,
287 heartbeat_task,
288 },
289 _ => self,
290 }
291 }
292}
293
294/// The state of the ssh connection.
295#[derive(Clone, Copy, Debug, PartialEq, Eq)]
296pub enum ConnectionState {
297 Connecting,
298 Connected,
299 HeartbeatMissed,
300 Reconnecting,
301 Disconnected,
302}
303
304impl From<&State> for ConnectionState {
305 fn from(value: &State) -> Self {
306 match value {
307 State::Connecting => Self::Connecting,
308 State::Connected { .. } => Self::Connected,
309 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
310 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
311 State::ReconnectExhausted => Self::Disconnected,
312 State::ServerNotRunning => Self::Disconnected,
313 }
314 }
315}
316
317pub struct RemoteClient {
318 client: Arc<ChannelClient>,
319 unique_identifier: String,
320 connection_options: RemoteConnectionOptions,
321 path_style: PathStyle,
322 state: Option<State>,
323}
324
325#[derive(Debug)]
326pub enum RemoteClientEvent {
327 Disconnected { server_not_running: bool },
328}
329
330impl EventEmitter<RemoteClientEvent> for RemoteClient {}
331
332/// Identifies the socket on the remote server so that reconnects
333/// can re-join the same project.
334pub enum ConnectionIdentifier {
335 Setup(u64),
336 Workspace(i64),
337}
338
339static NEXT_ID: AtomicU64 = AtomicU64::new(1);
340
341impl ConnectionIdentifier {
342 pub fn setup() -> Self {
343 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
344 }
345
346 // This string gets used in a socket name, and so must be relatively short.
347 // The total length of:
348 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
349 // Must be less than about 100 characters
350 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
351 // So our strings should be at most 20 characters or so.
352 fn to_string(&self, cx: &App) -> String {
353 let identifier_prefix = match ReleaseChannel::global(cx) {
354 ReleaseChannel::Stable => "".to_string(),
355 release_channel => format!("{}-", release_channel.dev_name()),
356 };
357 match self {
358 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
359 Self::Workspace(workspace_id) => {
360 format!("{identifier_prefix}workspace-{workspace_id}",)
361 }
362 }
363 }
364}
365
366pub async fn connect(
367 connection_options: RemoteConnectionOptions,
368 delegate: Arc<dyn RemoteClientDelegate>,
369 cx: &mut AsyncApp,
370) -> Result<Arc<dyn RemoteConnection>> {
371 cx.update(|cx| {
372 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
373 pool.connect(connection_options.clone(), delegate.clone(), cx)
374 })
375 })
376 .await
377 .map_err(|e| e.cloned())
378}
379
380impl RemoteClient {
381 pub fn new(
382 unique_identifier: ConnectionIdentifier,
383 remote_connection: Arc<dyn RemoteConnection>,
384 cancellation: oneshot::Receiver<()>,
385 delegate: Arc<dyn RemoteClientDelegate>,
386 cx: &mut App,
387 ) -> Task<Result<Option<Entity<Self>>>> {
388 let unique_identifier = unique_identifier.to_string(cx);
389 cx.spawn(async move |cx| {
390 let success = Box::pin(async move {
391 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
392 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
393 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
394
395 let client = cx.update(|cx| {
396 ChannelClient::new(
397 incoming_rx,
398 outgoing_tx,
399 cx,
400 "client",
401 remote_connection.has_wsl_interop(),
402 )
403 });
404
405 let path_style = remote_connection.path_style();
406 let this = cx.new(|_| Self {
407 client: client.clone(),
408 unique_identifier: unique_identifier.clone(),
409 connection_options: remote_connection.connection_options(),
410 path_style,
411 state: Some(State::Connecting),
412 });
413
414 let io_task = remote_connection.start_proxy(
415 unique_identifier,
416 false,
417 incoming_tx,
418 outgoing_rx,
419 connection_activity_tx,
420 delegate.clone(),
421 cx,
422 );
423
424 let ready = client
425 .wait_for_remote_started()
426 .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
427 .await;
428 match ready {
429 Ok(Some(_)) => {}
430 Ok(None) => {
431 let mut error = "remote client exited before becoming ready".to_owned();
432 if let Some(status) = io_task.now_or_never() {
433 match status {
434 Ok(exit_code) => {
435 error.push_str(&format!(", exit_code={exit_code:?}"))
436 }
437 Err(e) => error.push_str(&format!(", error={e:?}")),
438 }
439 }
440 let error = anyhow::anyhow!("{error}");
441 log::error!("failed to establish connection: {}", error);
442 return Err(error);
443 }
444 Err(_) => {
445 let mut error = String::new();
446 if let Some(status) = io_task.now_or_never() {
447 error.push_str("Client exited with ");
448 match status {
449 Ok(exit_code) => {
450 error.push_str(&format!(" exit_code {exit_code:?}"))
451 }
452 Err(e) => error.push_str(&format!(" error {e:?}")),
453 }
454 } else {
455 error.push_str("client did not become ready within the timeout");
456 }
457 let error = anyhow::anyhow!("{error}");
458 log::error!("failed to establish connection: {error}");
459 return Err(error);
460 }
461 }
462 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
463 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
464 log::error!("failed to establish connection: {}", error);
465 return Err(error);
466 }
467
468 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
469
470 this.update(cx, |this, _| {
471 this.state = Some(State::Connected {
472 remote_connection,
473 delegate,
474 multiplex_task,
475 heartbeat_task,
476 });
477 });
478
479 Ok(Some(this))
480 });
481
482 select! {
483 _ = cancellation.fuse() => {
484 Ok(None)
485 }
486 result = success.fuse() => result
487 }
488 })
489 }
490
491 pub fn proto_client_from_channels(
492 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
493 outgoing_tx: mpsc::UnboundedSender<Envelope>,
494 cx: &App,
495 name: &'static str,
496 has_wsl_interop: bool,
497 ) -> AnyProtoClient {
498 ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
499 }
500
501 pub fn shutdown_processes<T: RequestMessage>(
502 &mut self,
503 shutdown_request: Option<T>,
504 executor: BackgroundExecutor,
505 ) -> Option<impl Future<Output = ()> + use<T>> {
506 let state = self.state.take()?;
507 log::info!("shutting down remote processes");
508
509 let State::Connected {
510 multiplex_task,
511 heartbeat_task,
512 remote_connection,
513 delegate,
514 } = state
515 else {
516 return None;
517 };
518
519 let client = self.client.clone();
520
521 Some(async move {
522 if let Some(shutdown_request) = shutdown_request {
523 client.send(shutdown_request).log_err();
524 // We wait 50ms instead of waiting for a response, because
525 // waiting for a response would require us to wait on the main thread
526 // which we want to avoid in an `on_app_quit` callback.
527 executor.timer(Duration::from_millis(50)).await;
528 }
529
530 // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
531 // child of master_process.
532 drop(multiplex_task);
533 // Now drop the rest of state, which kills master process.
534 drop(heartbeat_task);
535 drop(remote_connection);
536 drop(delegate);
537 })
538 }
539
540 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
541 let can_reconnect = self
542 .state
543 .as_ref()
544 .map(|state| state.can_reconnect())
545 .unwrap_or(false);
546 if !can_reconnect {
547 let state = if let Some(state) = self.state.as_ref() {
548 state.to_string()
549 } else {
550 "no state set".to_string()
551 };
552 log::info!(
553 "aborting reconnect, because not in state that allows reconnecting: {state}"
554 );
555 anyhow::bail!(
556 "aborting reconnect, because not in state that allows reconnecting: {state}"
557 );
558 }
559
560 let state = self.state.take().unwrap();
561 let (attempts, remote_connection, delegate) = match state {
562 State::Connected {
563 remote_connection,
564 delegate,
565 multiplex_task,
566 heartbeat_task,
567 }
568 | State::HeartbeatMissed {
569 remote_connection,
570 delegate,
571 multiplex_task,
572 heartbeat_task,
573 ..
574 } => {
575 drop(multiplex_task);
576 drop(heartbeat_task);
577 (0, remote_connection, delegate)
578 }
579 State::ReconnectFailed {
580 attempts,
581 remote_connection,
582 delegate,
583 ..
584 } => (attempts, remote_connection, delegate),
585 State::Connecting
586 | State::Reconnecting
587 | State::ReconnectExhausted
588 | State::ServerNotRunning => unreachable!(),
589 };
590
591 let attempts = attempts + 1;
592 if attempts > MAX_RECONNECT_ATTEMPTS {
593 log::error!(
594 "Failed to reconnect to after {} attempts, giving up",
595 MAX_RECONNECT_ATTEMPTS
596 );
597 self.set_state(State::ReconnectExhausted, cx);
598 return Ok(());
599 }
600
601 self.set_state(State::Reconnecting, cx);
602
603 log::info!(
604 "Trying to reconnect to remote server... Attempt {}",
605 attempts
606 );
607
608 let unique_identifier = self.unique_identifier.clone();
609 let client = self.client.clone();
610 let reconnect_task = cx.spawn(async move |this, cx| {
611 macro_rules! failed {
612 ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
613 delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
614 return State::ReconnectFailed {
615 error: anyhow!($error),
616 attempts: $attempts,
617 remote_connection: $remote_connection,
618 delegate: $delegate,
619 };
620 };
621 }
622
623 if let Err(error) = remote_connection
624 .kill()
625 .await
626 .context("Failed to kill remote_connection process")
627 {
628 failed!(error, attempts, remote_connection, delegate);
629 };
630
631 let connection_options = remote_connection.connection_options();
632
633 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
634 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
635 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
636
637 let (remote_connection, io_task) = match async {
638 let remote_connection = cx
639 .update_global(|pool: &mut ConnectionPool, cx| {
640 pool.connect(connection_options, delegate.clone(), cx)
641 })
642 .await
643 .map_err(|error| error.cloned())?;
644
645 let io_task = remote_connection.start_proxy(
646 unique_identifier,
647 true,
648 incoming_tx,
649 outgoing_rx,
650 connection_activity_tx,
651 delegate.clone(),
652 cx,
653 );
654 anyhow::Ok((remote_connection, io_task))
655 }
656 .await
657 {
658 Ok((remote_connection, io_task)) => (remote_connection, io_task),
659 Err(error) => {
660 failed!(error, attempts, remote_connection, delegate);
661 }
662 };
663
664 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
665 client.reconnect(incoming_rx, outgoing_tx, cx);
666
667 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
668 failed!(error, attempts, remote_connection, delegate);
669 };
670
671 State::Connected {
672 remote_connection,
673 delegate,
674 multiplex_task,
675 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
676 }
677 });
678
679 cx.spawn(async move |this, cx| {
680 let new_state = reconnect_task.await;
681 this.update(cx, |this, cx| {
682 this.try_set_state(cx, |old_state| {
683 if old_state.is_reconnecting() {
684 match &new_state {
685 State::Connecting
686 | State::Reconnecting
687 | State::HeartbeatMissed { .. }
688 | State::ServerNotRunning => {}
689 State::Connected { .. } => {
690 log::info!("Successfully reconnected");
691 }
692 State::ReconnectFailed {
693 error, attempts, ..
694 } => {
695 log::error!(
696 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
697 attempts,
698 error
699 );
700 }
701 State::ReconnectExhausted => {
702 log::error!("Reconnect attempt failed and all attempts exhausted");
703 }
704 }
705 Some(new_state)
706 } else {
707 None
708 }
709 });
710
711 if this.state_is(State::is_reconnect_failed) {
712 this.reconnect(cx)
713 } else if this.state_is(State::is_reconnect_exhausted) {
714 Ok(())
715 } else {
716 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
717 Ok(())
718 }
719 })
720 })
721 .detach_and_log_err(cx);
722
723 Ok(())
724 }
725
726 fn heartbeat(
727 this: WeakEntity<Self>,
728 mut connection_activity_rx: mpsc::Receiver<()>,
729 cx: &mut AsyncApp,
730 ) -> Task<Result<()>> {
731 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
732 return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
733 };
734
735 cx.spawn(async move |cx| {
736 let mut missed_heartbeats = 0;
737
738 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
739 futures::pin_mut!(keepalive_timer);
740
741 loop {
742 select_biased! {
743 result = connection_activity_rx.next().fuse() => {
744 if result.is_none() {
745 log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
746 return Ok(());
747 }
748
749 if missed_heartbeats != 0 {
750 missed_heartbeats = 0;
751 let _ =this.update(cx, |this, cx| {
752 this.handle_heartbeat_result(missed_heartbeats, cx)
753 })?;
754 }
755 }
756 _ = keepalive_timer => {
757 log::debug!("Sending heartbeat to server...");
758
759 let result = select_biased! {
760 _ = connection_activity_rx.next().fuse() => {
761 Ok(())
762 }
763 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
764 ping_result
765 }
766 };
767
768 if result.is_err() {
769 missed_heartbeats += 1;
770 log::warn!(
771 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
772 HEARTBEAT_TIMEOUT,
773 missed_heartbeats,
774 MAX_MISSED_HEARTBEATS
775 );
776 } else if missed_heartbeats != 0 {
777 missed_heartbeats = 0;
778 } else {
779 continue;
780 }
781
782 let result = this.update(cx, |this, cx| {
783 this.handle_heartbeat_result(missed_heartbeats, cx)
784 })?;
785 if result.is_break() {
786 return Ok(());
787 }
788 }
789 }
790
791 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
792 }
793 })
794 }
795
796 fn handle_heartbeat_result(
797 &mut self,
798 missed_heartbeats: usize,
799 cx: &mut Context<Self>,
800 ) -> ControlFlow<()> {
801 let state = self.state.take().unwrap();
802 let next_state = if missed_heartbeats > 0 {
803 state.heartbeat_missed()
804 } else {
805 state.heartbeat_recovered()
806 };
807
808 self.set_state(next_state, cx);
809
810 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
811 log::error!(
812 "Missed last {} heartbeats. Reconnecting...",
813 missed_heartbeats
814 );
815
816 self.reconnect(cx)
817 .context("failed to start reconnect process after missing heartbeats")
818 .log_err();
819 ControlFlow::Break(())
820 } else {
821 ControlFlow::Continue(())
822 }
823 }
824
825 fn monitor(
826 this: WeakEntity<Self>,
827 io_task: Task<Result<i32>>,
828 cx: &AsyncApp,
829 ) -> Task<Result<()>> {
830 cx.spawn(async move |cx| {
831 let result = io_task.await;
832
833 match result {
834 Ok(exit_code) => {
835 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
836 match error {
837 ProxyLaunchError::ServerNotRunning => {
838 log::error!("failed to reconnect because server is not running");
839 this.update(cx, |this, cx| {
840 this.set_state(State::ServerNotRunning, cx);
841 })?;
842 }
843 }
844 } else if exit_code > 0 {
845 log::error!("proxy process terminated unexpectedly");
846 this.update(cx, |this, cx| {
847 this.reconnect(cx).ok();
848 })?;
849 }
850 }
851 Err(error) => {
852 log::warn!(
853 "remote io task died with error: {:?}. reconnecting...",
854 error
855 );
856 this.update(cx, |this, cx| {
857 this.reconnect(cx).ok();
858 })?;
859 }
860 }
861
862 Ok(())
863 })
864 }
865
866 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
867 self.state.as_ref().is_some_and(check)
868 }
869
870 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
871 let new_state = self.state.as_ref().and_then(map);
872 if let Some(new_state) = new_state {
873 self.state.replace(new_state);
874 cx.notify();
875 }
876 }
877
878 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
879 log::info!("setting state to '{}'", &state);
880
881 let is_reconnect_exhausted = state.is_reconnect_exhausted();
882 let is_server_not_running = state.is_server_not_running();
883 self.state.replace(state);
884
885 if is_reconnect_exhausted || is_server_not_running {
886 cx.emit(RemoteClientEvent::Disconnected {
887 server_not_running: is_server_not_running,
888 });
889 }
890 cx.notify();
891 }
892
893 pub fn shell(&self) -> Option<String> {
894 Some(self.remote_connection()?.shell())
895 }
896
897 pub fn default_system_shell(&self) -> Option<String> {
898 Some(self.remote_connection()?.default_system_shell())
899 }
900
901 pub fn shares_network_interface(&self) -> bool {
902 self.remote_connection()
903 .map_or(false, |connection| connection.shares_network_interface())
904 }
905
906 pub fn has_wsl_interop(&self) -> bool {
907 self.remote_connection()
908 .map_or(false, |connection| connection.has_wsl_interop())
909 }
910
911 pub fn build_command(
912 &self,
913 program: Option<String>,
914 args: &[String],
915 env: &HashMap<String, String>,
916 working_dir: Option<String>,
917 port_forward: Option<(u16, String, u16)>,
918 ) -> Result<CommandTemplate> {
919 self.build_command_with_options(
920 program,
921 args,
922 env,
923 working_dir,
924 port_forward,
925 Interactive::Yes,
926 )
927 }
928
929 pub fn build_command_with_options(
930 &self,
931 program: Option<String>,
932 args: &[String],
933 env: &HashMap<String, String>,
934 working_dir: Option<String>,
935 port_forward: Option<(u16, String, u16)>,
936 interactive: Interactive,
937 ) -> Result<CommandTemplate> {
938 let Some(connection) = self.remote_connection() else {
939 return Err(anyhow!("no remote connection"));
940 };
941 connection.build_command(program, args, env, working_dir, port_forward, interactive)
942 }
943
944 pub fn build_forward_ports_command(
945 &self,
946 forwards: Vec<(u16, String, u16)>,
947 ) -> Result<CommandTemplate> {
948 let Some(connection) = self.remote_connection() else {
949 return Err(anyhow!("no remote connection"));
950 };
951 connection.build_forward_ports_command(forwards)
952 }
953
954 pub fn upload_directory(
955 &self,
956 src_path: PathBuf,
957 dest_path: RemotePathBuf,
958 cx: &App,
959 ) -> Task<Result<()>> {
960 let Some(connection) = self.remote_connection() else {
961 return Task::ready(Err(anyhow!("no remote connection")));
962 };
963 connection.upload_directory(src_path, dest_path, cx)
964 }
965
966 pub fn proto_client(&self) -> AnyProtoClient {
967 self.client.clone().into()
968 }
969
970 pub fn connection_options(&self) -> RemoteConnectionOptions {
971 self.connection_options.clone()
972 }
973
974 pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
975 if let State::Connected {
976 remote_connection, ..
977 } = self.state.as_ref()?
978 {
979 Some(remote_connection.clone())
980 } else {
981 None
982 }
983 }
984
985 pub fn connection_state(&self) -> ConnectionState {
986 self.state
987 .as_ref()
988 .map(ConnectionState::from)
989 .unwrap_or(ConnectionState::Disconnected)
990 }
991
992 pub fn is_disconnected(&self) -> bool {
993 self.connection_state() == ConnectionState::Disconnected
994 }
995
996 pub fn path_style(&self) -> PathStyle {
997 self.path_style
998 }
999
1000 /// Forcibly disconnects from the remote server by killing the underlying connection.
1001 /// This will trigger the reconnection logic if reconnection attempts remain.
1002 /// Useful for testing reconnection behavior in real environments.
1003 pub fn force_disconnect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1004 let Some(connection) = self.remote_connection() else {
1005 return Task::ready(Err(anyhow!("no active remote connection to disconnect")));
1006 };
1007
1008 log::info!("force_disconnect: killing remote connection");
1009
1010 cx.spawn(async move |_, _| {
1011 connection.kill().await?;
1012 Ok(())
1013 })
1014 }
1015
1016 /// Simulates a timeout by pausing heartbeat responses.
1017 /// This will cause heartbeat failures and eventually trigger reconnection
1018 /// after MAX_MISSED_HEARTBEATS are missed.
1019 /// Useful for testing timeout behavior in real environments.
1020 pub fn force_heartbeat_timeout(&mut self, attempts: usize, cx: &mut Context<Self>) {
1021 log::info!("force_heartbeat_timeout: triggering heartbeat failure state");
1022
1023 if let Some(State::Connected {
1024 remote_connection,
1025 delegate,
1026 multiplex_task,
1027 heartbeat_task,
1028 }) = self.state.take()
1029 {
1030 self.set_state(
1031 if attempts == 0 {
1032 State::HeartbeatMissed {
1033 missed_heartbeats: MAX_MISSED_HEARTBEATS,
1034 remote_connection,
1035 delegate,
1036 multiplex_task,
1037 heartbeat_task,
1038 }
1039 } else {
1040 State::ReconnectFailed {
1041 remote_connection,
1042 delegate,
1043 error: anyhow!("forced heartbeat timeout"),
1044 attempts,
1045 }
1046 },
1047 cx,
1048 );
1049
1050 self.reconnect(cx)
1051 .context("failed to start reconnect after forced timeout")
1052 .log_err();
1053 } else {
1054 log::warn!("force_heartbeat_timeout: not in Connected state, ignoring");
1055 }
1056 }
1057
1058 #[cfg(any(test, feature = "test-support"))]
1059 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
1060 let opts = self.connection_options();
1061 client_cx.spawn(async move |cx| {
1062 let connection = cx.update_global(|c: &mut ConnectionPool, _| {
1063 if let Some(ConnectionPoolEntry::Connected(c)) = c.connections.get(&opts) {
1064 if let Some(connection) = c.upgrade() {
1065 connection
1066 } else {
1067 panic!("connection was dropped")
1068 }
1069 } else {
1070 panic!("missing test connection")
1071 }
1072 });
1073
1074 connection.simulate_disconnect(cx);
1075 })
1076 }
1077
1078 /// Creates a mock connection pair for testing.
1079 ///
1080 /// This is the recommended way to create mock remote connections for tests.
1081 /// It returns the `MockConnectionOptions` (which can be passed to create a
1082 /// `HeadlessProject`), an `AnyProtoClient` for the server side and a
1083 /// `ConnectGuard` for the client side which blocks the connection from
1084 /// being established until dropped.
1085 ///
1086 /// # Example
1087 /// ```ignore
1088 /// let (opts, server_session, connect_guard) = RemoteClient::fake_server(cx, server_cx);
1089 /// // Set up HeadlessProject with server_session...
1090 /// drop(connect_guard);
1091 /// let client = RemoteClient::fake_client(opts, cx).await;
1092 /// ```
1093 #[cfg(any(test, feature = "test-support"))]
1094 pub fn fake_server(
1095 client_cx: &mut gpui::TestAppContext,
1096 server_cx: &mut gpui::TestAppContext,
1097 ) -> (RemoteConnectionOptions, AnyProtoClient, ConnectGuard) {
1098 use crate::transport::mock::MockConnection;
1099 let (opts, server_client, connect_guard) = MockConnection::new(client_cx, server_cx);
1100 (opts.into(), server_client, connect_guard)
1101 }
1102
1103 /// Creates a `RemoteClient` connected to a mock server.
1104 ///
1105 /// Call `fake_server` first to get the connection options, set up the
1106 /// `HeadlessProject` with the server session, then call this method
1107 /// to create the client.
1108 #[cfg(any(test, feature = "test-support"))]
1109 pub async fn connect_mock(
1110 opts: RemoteConnectionOptions,
1111 client_cx: &mut gpui::TestAppContext,
1112 ) -> Entity<Self> {
1113 assert!(matches!(opts, RemoteConnectionOptions::Mock(..)));
1114 use crate::transport::mock::MockDelegate;
1115 let (_tx, rx) = oneshot::channel();
1116 let mut cx = client_cx.to_async();
1117 let connection = connect(opts, Arc::new(MockDelegate), &mut cx)
1118 .await
1119 .unwrap();
1120 client_cx
1121 .update(|cx| {
1122 Self::new(
1123 ConnectionIdentifier::setup(),
1124 connection,
1125 rx,
1126 Arc::new(MockDelegate),
1127 cx,
1128 )
1129 })
1130 .await
1131 .unwrap()
1132 .unwrap()
1133 }
1134
1135 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1136 self.state
1137 .as_ref()
1138 .and_then(|state| state.remote_connection())
1139 }
1140}
1141
1142enum ConnectionPoolEntry {
1143 Connecting(WeakShared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1144 Connected(Weak<dyn RemoteConnection>),
1145}
1146
1147#[derive(Default)]
1148struct ConnectionPool {
1149 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1150}
1151
1152impl Global for ConnectionPool {}
1153
1154impl ConnectionPool {
1155 fn connect(
1156 &mut self,
1157 opts: RemoteConnectionOptions,
1158 delegate: Arc<dyn RemoteClientDelegate>,
1159 cx: &mut App,
1160 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1161 let connection = self.connections.get(&opts);
1162 match connection {
1163 Some(ConnectionPoolEntry::Connecting(task)) => {
1164 if let Some(task) = task.upgrade() {
1165 log::debug!("Connecting task is still alive");
1166 cx.spawn(async move |cx| {
1167 delegate.set_status(Some("Waiting for existing connection attempt"), cx)
1168 })
1169 .detach();
1170 return task;
1171 }
1172 log::debug!("Connecting task is dead, removing it and restarting a connection");
1173 self.connections.remove(&opts);
1174 }
1175 Some(ConnectionPoolEntry::Connected(remote)) => {
1176 if let Some(remote) = remote.upgrade()
1177 && !remote.has_been_killed()
1178 {
1179 log::debug!("Connection is still alive");
1180 return Task::ready(Ok(remote)).shared();
1181 }
1182 log::debug!("Connection is dead, removing it and restarting a connection");
1183 self.connections.remove(&opts);
1184 }
1185 None => {
1186 log::debug!("No existing connection found, starting a new one");
1187 }
1188 }
1189
1190 let task = cx
1191 .spawn({
1192 let opts = opts.clone();
1193 let delegate = delegate.clone();
1194 async move |cx| {
1195 let connection = match opts.clone() {
1196 RemoteConnectionOptions::Ssh(opts) => {
1197 SshRemoteConnection::new(opts, delegate, cx)
1198 .await
1199 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1200 }
1201 RemoteConnectionOptions::Wsl(opts) => {
1202 WslRemoteConnection::new(opts, delegate, cx)
1203 .await
1204 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1205 }
1206 RemoteConnectionOptions::Docker(opts) => {
1207 DockerExecConnection::new(opts, delegate, cx)
1208 .await
1209 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1210 }
1211 #[cfg(any(test, feature = "test-support"))]
1212 RemoteConnectionOptions::Mock(opts) => match cx.update(|cx| {
1213 cx.default_global::<crate::transport::mock::MockConnectionRegistry>()
1214 .take(&opts)
1215 }) {
1216 Some(connection) => Ok(connection.await as Arc<dyn RemoteConnection>),
1217 None => Err(anyhow!(
1218 "Mock connection not found. Call MockConnection::new() first."
1219 )),
1220 },
1221 };
1222
1223 cx.update_global(|pool: &mut Self, _| {
1224 debug_assert!(matches!(
1225 pool.connections.get(&opts),
1226 Some(ConnectionPoolEntry::Connecting(_))
1227 ));
1228 match connection {
1229 Ok(connection) => {
1230 pool.connections.insert(
1231 opts.clone(),
1232 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1233 );
1234 Ok(connection)
1235 }
1236 Err(error) => {
1237 pool.connections.remove(&opts);
1238 Err(Arc::new(error))
1239 }
1240 }
1241 })
1242 }
1243 })
1244 .shared();
1245 if let Some(task) = task.downgrade() {
1246 self.connections
1247 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task));
1248 }
1249 task
1250 }
1251}
1252
1253#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1254pub enum RemoteConnectionOptions {
1255 Ssh(SshConnectionOptions),
1256 Wsl(WslConnectionOptions),
1257 Docker(DockerConnectionOptions),
1258 #[cfg(any(test, feature = "test-support"))]
1259 Mock(crate::transport::mock::MockConnectionOptions),
1260}
1261
1262impl RemoteConnectionOptions {
1263 pub fn display_name(&self) -> String {
1264 match self {
1265 RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1266 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1267 RemoteConnectionOptions::Docker(opts) => {
1268 if opts.use_podman {
1269 format!("[podman] {}", opts.name)
1270 } else {
1271 opts.name.clone()
1272 }
1273 }
1274 #[cfg(any(test, feature = "test-support"))]
1275 RemoteConnectionOptions::Mock(opts) => format!("mock-{}", opts.id),
1276 }
1277 }
1278}
1279
1280impl From<SshConnectionOptions> for RemoteConnectionOptions {
1281 fn from(opts: SshConnectionOptions) -> Self {
1282 RemoteConnectionOptions::Ssh(opts)
1283 }
1284}
1285
1286impl From<WslConnectionOptions> for RemoteConnectionOptions {
1287 fn from(opts: WslConnectionOptions) -> Self {
1288 RemoteConnectionOptions::Wsl(opts)
1289 }
1290}
1291
1292#[cfg(any(test, feature = "test-support"))]
1293impl From<crate::transport::mock::MockConnectionOptions> for RemoteConnectionOptions {
1294 fn from(opts: crate::transport::mock::MockConnectionOptions) -> Self {
1295 RemoteConnectionOptions::Mock(opts)
1296 }
1297}
1298
1299#[cfg(target_os = "windows")]
1300/// Open a wsl path (\\wsl.localhost\<distro>\path)
1301#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1302#[action(namespace = workspace, no_json, no_register)]
1303pub struct OpenWslPath {
1304 pub distro: WslConnectionOptions,
1305 pub paths: Vec<PathBuf>,
1306}
1307
1308#[async_trait(?Send)]
1309pub trait RemoteConnection: Send + Sync {
1310 fn start_proxy(
1311 &self,
1312 unique_identifier: String,
1313 reconnect: bool,
1314 incoming_tx: UnboundedSender<Envelope>,
1315 outgoing_rx: UnboundedReceiver<Envelope>,
1316 connection_activity_tx: Sender<()>,
1317 delegate: Arc<dyn RemoteClientDelegate>,
1318 cx: &mut AsyncApp,
1319 ) -> Task<Result<i32>>;
1320 fn upload_directory(
1321 &self,
1322 src_path: PathBuf,
1323 dest_path: RemotePathBuf,
1324 cx: &App,
1325 ) -> Task<Result<()>>;
1326 async fn kill(&self) -> Result<()>;
1327 fn has_been_killed(&self) -> bool;
1328 fn shares_network_interface(&self) -> bool {
1329 false
1330 }
1331 fn build_command(
1332 &self,
1333 program: Option<String>,
1334 args: &[String],
1335 env: &HashMap<String, String>,
1336 working_dir: Option<String>,
1337 port_forward: Option<(u16, String, u16)>,
1338 interactive: Interactive,
1339 ) -> Result<CommandTemplate>;
1340 fn build_forward_ports_command(
1341 &self,
1342 forwards: Vec<(u16, String, u16)>,
1343 ) -> Result<CommandTemplate>;
1344 fn connection_options(&self) -> RemoteConnectionOptions;
1345 fn path_style(&self) -> PathStyle;
1346 fn shell(&self) -> String;
1347 fn default_system_shell(&self) -> String;
1348 fn has_wsl_interop(&self) -> bool;
1349
1350 #[cfg(any(test, feature = "test-support"))]
1351 fn simulate_disconnect(&self, _: &AsyncApp) {}
1352}
1353
1354type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1355
1356struct Signal<T> {
1357 tx: Mutex<Option<oneshot::Sender<T>>>,
1358 rx: Shared<Task<Option<T>>>,
1359}
1360
1361impl<T: Send + Clone + 'static> Signal<T> {
1362 pub fn new(cx: &App) -> Self {
1363 let (tx, rx) = oneshot::channel();
1364
1365 let task = cx
1366 .background_executor()
1367 .spawn(async move { rx.await.ok() })
1368 .shared();
1369
1370 Self {
1371 tx: Mutex::new(Some(tx)),
1372 rx: task,
1373 }
1374 }
1375
1376 fn set(&self, value: T) {
1377 if let Some(tx) = self.tx.lock().take() {
1378 let _ = tx.send(value);
1379 }
1380 }
1381
1382 fn wait(&self) -> Shared<Task<Option<T>>> {
1383 self.rx.clone()
1384 }
1385}
1386
1387pub(crate) struct ChannelClient {
1388 next_message_id: AtomicU32,
1389 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1390 buffer: Mutex<VecDeque<Envelope>>,
1391 response_channels: ResponseChannels,
1392 message_handlers: Mutex<ProtoMessageHandlerSet>,
1393 max_received: AtomicU32,
1394 name: &'static str,
1395 task: Mutex<Task<Result<()>>>,
1396 remote_started: Signal<()>,
1397 has_wsl_interop: bool,
1398 executor: BackgroundExecutor,
1399}
1400
1401impl ChannelClient {
1402 pub(crate) fn new(
1403 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1404 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1405 cx: &App,
1406 name: &'static str,
1407 has_wsl_interop: bool,
1408 ) -> Arc<Self> {
1409 Arc::new_cyclic(|this| Self {
1410 outgoing_tx: Mutex::new(outgoing_tx),
1411 next_message_id: AtomicU32::new(0),
1412 max_received: AtomicU32::new(0),
1413 response_channels: ResponseChannels::default(),
1414 message_handlers: Default::default(),
1415 buffer: Mutex::new(VecDeque::new()),
1416 name,
1417 executor: cx.background_executor().clone(),
1418 task: Mutex::new(Self::start_handling_messages(
1419 this.clone(),
1420 incoming_rx,
1421 &cx.to_async(),
1422 )),
1423 remote_started: Signal::new(cx),
1424 has_wsl_interop,
1425 })
1426 }
1427
1428 fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1429 self.remote_started.wait()
1430 }
1431
1432 fn start_handling_messages(
1433 this: Weak<Self>,
1434 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1435 cx: &AsyncApp,
1436 ) -> Task<Result<()>> {
1437 cx.spawn(async move |cx| {
1438 if let Some(this) = this.upgrade() {
1439 let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1440 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1441 };
1442
1443 let peer_id = PeerId { owner_id: 0, id: 0 };
1444 while let Some(incoming) = incoming_rx.next().await {
1445 let Some(this) = this.upgrade() else {
1446 return anyhow::Ok(());
1447 };
1448 if let Some(ack_id) = incoming.ack_id {
1449 let mut buffer = this.buffer.lock();
1450 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1451 buffer.pop_front();
1452 }
1453 }
1454 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1455 {
1456 log::debug!(
1457 "{}:remote message received. name:FlushBufferedMessages",
1458 this.name
1459 );
1460 {
1461 let buffer = this.buffer.lock();
1462 for envelope in buffer.iter() {
1463 this.outgoing_tx
1464 .lock()
1465 .unbounded_send(envelope.clone())
1466 .ok();
1467 }
1468 }
1469 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1470 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1471 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1472 continue;
1473 }
1474
1475 if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1476 this.remote_started.set(());
1477 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1478 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1479 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1480 continue;
1481 }
1482
1483 this.max_received.store(incoming.id, SeqCst);
1484
1485 if let Some(request_id) = incoming.responding_to {
1486 let request_id = MessageId(request_id);
1487 let sender = this.response_channels.lock().remove(&request_id);
1488 if let Some(sender) = sender {
1489 let (tx, rx) = oneshot::channel();
1490 if incoming.payload.is_some() {
1491 sender.send((incoming, tx)).ok();
1492 }
1493 rx.await.ok();
1494 }
1495 } else if let Some(envelope) =
1496 build_typed_envelope(peer_id, Instant::now(), incoming)
1497 {
1498 let type_name = envelope.payload_type_name();
1499 let message_id = envelope.message_id();
1500 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1501 &this.message_handlers,
1502 envelope,
1503 this.clone().into(),
1504 cx.clone(),
1505 ) {
1506 log::debug!("{}:remote message received. name:{type_name}", this.name);
1507 cx.foreground_executor()
1508 .spawn(async move {
1509 match future.await {
1510 Ok(_) => {
1511 log::debug!(
1512 "{}:remote message handled. name:{type_name}",
1513 this.name
1514 );
1515 }
1516 Err(error) => {
1517 log::error!(
1518 "{}:error handling message. type:{}, error:{:#}",
1519 this.name,
1520 type_name,
1521 format!("{error:#}").lines().fold(
1522 String::new(),
1523 |mut message, line| {
1524 if !message.is_empty() {
1525 message.push(' ');
1526 }
1527 message.push_str(line);
1528 message
1529 }
1530 )
1531 );
1532 }
1533 }
1534 })
1535 .detach()
1536 } else {
1537 log::error!("{}:unhandled remote message name:{type_name}", this.name);
1538 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1539 message_id,
1540 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1541 ) {
1542 log::error!(
1543 "{}:error sending error response for {type_name}:{e:#}",
1544 this.name
1545 );
1546 }
1547 }
1548 }
1549 }
1550 anyhow::Ok(())
1551 })
1552 }
1553
1554 pub(crate) fn reconnect(
1555 self: &Arc<Self>,
1556 incoming_rx: UnboundedReceiver<Envelope>,
1557 outgoing_tx: UnboundedSender<Envelope>,
1558 cx: &AsyncApp,
1559 ) {
1560 *self.outgoing_tx.lock() = outgoing_tx;
1561 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1562 }
1563
1564 fn request<T: RequestMessage>(
1565 &self,
1566 payload: T,
1567 ) -> impl 'static + Future<Output = Result<T::Response>> {
1568 self.request_internal(payload, true)
1569 }
1570
1571 fn request_internal<T: RequestMessage>(
1572 &self,
1573 payload: T,
1574 use_buffer: bool,
1575 ) -> impl 'static + Future<Output = Result<T::Response>> {
1576 log::debug!("remote request start. name:{}", T::NAME);
1577 let response =
1578 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1579 async move {
1580 let response = response.await?;
1581 log::debug!("remote request finish. name:{}", T::NAME);
1582 T::Response::from_envelope(response).context("received a response of the wrong type")
1583 }
1584 }
1585
1586 async fn resync(&self, timeout: Duration) -> Result<()> {
1587 smol::future::or(
1588 async {
1589 self.request_internal(proto::FlushBufferedMessages {}, false)
1590 .await?;
1591
1592 for envelope in self.buffer.lock().iter() {
1593 self.outgoing_tx
1594 .lock()
1595 .unbounded_send(envelope.clone())
1596 .ok();
1597 }
1598 Ok(())
1599 },
1600 async {
1601 self.executor.timer(timeout).await;
1602 anyhow::bail!("Timed out resyncing remote client")
1603 },
1604 )
1605 .await
1606 }
1607
1608 async fn ping(&self, timeout: Duration) -> Result<()> {
1609 smol::future::or(
1610 async {
1611 self.request(proto::Ping {}).await?;
1612 Ok(())
1613 },
1614 async {
1615 self.executor.timer(timeout).await;
1616 anyhow::bail!("Timed out pinging remote client")
1617 },
1618 )
1619 .await
1620 }
1621
1622 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1623 log::debug!("remote send name:{}", T::NAME);
1624 self.send_dynamic(payload.into_envelope(0, None, None))
1625 }
1626
1627 fn request_dynamic(
1628 &self,
1629 mut envelope: proto::Envelope,
1630 type_name: &'static str,
1631 use_buffer: bool,
1632 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1633 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1634 let (tx, rx) = oneshot::channel();
1635 let mut response_channels_lock = self.response_channels.lock();
1636 response_channels_lock.insert(MessageId(envelope.id), tx);
1637 drop(response_channels_lock);
1638
1639 let result = if use_buffer {
1640 self.send_buffered(envelope)
1641 } else {
1642 self.send_unbuffered(envelope)
1643 };
1644 async move {
1645 if let Err(error) = &result {
1646 log::error!("failed to send message: {error}");
1647 anyhow::bail!("failed to send message: {error}");
1648 }
1649
1650 let response = rx.await.context("connection lost")?.0;
1651 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1652 return Err(RpcError::from_proto(error, type_name));
1653 }
1654 Ok(response)
1655 }
1656 }
1657
1658 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1659 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1660 self.send_buffered(envelope)
1661 }
1662
1663 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1664 envelope.ack_id = Some(self.max_received.load(SeqCst));
1665 self.buffer.lock().push_back(envelope.clone());
1666 // ignore errors on send (happen while we're reconnecting)
1667 // assume that the global "disconnected" overlay is sufficient.
1668 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1669 Ok(())
1670 }
1671
1672 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1673 envelope.ack_id = Some(self.max_received.load(SeqCst));
1674 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1675 Ok(())
1676 }
1677}
1678
1679impl ProtoClient for ChannelClient {
1680 fn request(
1681 &self,
1682 envelope: proto::Envelope,
1683 request_type: &'static str,
1684 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1685 self.request_dynamic(envelope, request_type, true).boxed()
1686 }
1687
1688 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1689 self.send_dynamic(envelope)
1690 }
1691
1692 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1693 self.send_dynamic(envelope)
1694 }
1695
1696 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1697 &self.message_handlers
1698 }
1699
1700 fn is_via_collab(&self) -> bool {
1701 false
1702 }
1703
1704 fn has_wsl_interop(&self) -> bool {
1705 self.has_wsl_interop
1706 }
1707}