1use crate::{
2 SshConnectionOptions,
3 protocol::MessageId,
4 proxy::ProxyLaunchError,
5 transport::{
6 docker::{DockerConnectionOptions, DockerExecConnection},
7 ssh::SshRemoteConnection,
8 wsl::{WslConnectionOptions, WslRemoteConnection},
9 },
10};
11use anyhow::{Context as _, Result, anyhow};
12use askpass::EncryptedPassword;
13use async_trait::async_trait;
14use collections::HashMap;
15use futures::{
16 Future, FutureExt as _, StreamExt as _,
17 channel::{
18 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
19 oneshot,
20 },
21 future::{BoxFuture, Shared},
22 select, select_biased,
23};
24use gpui::{
25 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
26 EventEmitter, FutureExt, Global, Task, WeakEntity,
27};
28use parking_lot::Mutex;
29
30use release_channel::ReleaseChannel;
31use rpc::{
32 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
33 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
34};
35use semver::Version;
36use std::{
37 collections::VecDeque,
38 fmt,
39 ops::ControlFlow,
40 path::PathBuf,
41 sync::{
42 Arc, Weak,
43 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
44 },
45 time::{Duration, Instant},
46};
47use util::{
48 ResultExt,
49 paths::{PathStyle, RemotePathBuf},
50};
51
52#[derive(Copy, Clone, Debug)]
53pub struct RemotePlatform {
54 pub os: &'static str,
55 pub arch: &'static str,
56}
57
58#[derive(Clone, Debug)]
59pub struct CommandTemplate {
60 pub program: String,
61 pub args: Vec<String>,
62 pub env: HashMap<String, String>,
63}
64
65pub trait RemoteClientDelegate: Send + Sync {
66 fn ask_password(
67 &self,
68 prompt: String,
69 tx: oneshot::Sender<EncryptedPassword>,
70 cx: &mut AsyncApp,
71 );
72 fn get_download_url(
73 &self,
74 platform: RemotePlatform,
75 release_channel: ReleaseChannel,
76 version: Option<Version>,
77 cx: &mut AsyncApp,
78 ) -> Task<Result<Option<String>>>;
79 fn download_server_binary_locally(
80 &self,
81 platform: RemotePlatform,
82 release_channel: ReleaseChannel,
83 version: Option<Version>,
84 cx: &mut AsyncApp,
85 ) -> Task<Result<PathBuf>>;
86 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
87}
88
89const MAX_MISSED_HEARTBEATS: usize = 5;
90const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
91const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
92const INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
93
94const MAX_RECONNECT_ATTEMPTS: usize = 3;
95
96enum State {
97 Connecting,
98 Connected {
99 remote_connection: Arc<dyn RemoteConnection>,
100 delegate: Arc<dyn RemoteClientDelegate>,
101
102 multiplex_task: Task<Result<()>>,
103 heartbeat_task: Task<Result<()>>,
104 },
105 HeartbeatMissed {
106 missed_heartbeats: usize,
107
108 ssh_connection: Arc<dyn RemoteConnection>,
109 delegate: Arc<dyn RemoteClientDelegate>,
110
111 multiplex_task: Task<Result<()>>,
112 heartbeat_task: Task<Result<()>>,
113 },
114 Reconnecting,
115 ReconnectFailed {
116 ssh_connection: Arc<dyn RemoteConnection>,
117 delegate: Arc<dyn RemoteClientDelegate>,
118
119 error: anyhow::Error,
120 attempts: usize,
121 },
122 ReconnectExhausted,
123 ServerNotRunning,
124}
125
126impl fmt::Display for State {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 match self {
129 Self::Connecting => write!(f, "connecting"),
130 Self::Connected { .. } => write!(f, "connected"),
131 Self::Reconnecting => write!(f, "reconnecting"),
132 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
133 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
134 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
135 Self::ServerNotRunning { .. } => write!(f, "server not running"),
136 }
137 }
138}
139
140impl State {
141 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
142 match self {
143 Self::Connected {
144 remote_connection: ssh_connection,
145 ..
146 } => Some(ssh_connection.clone()),
147 Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.clone()),
148 Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.clone()),
149 _ => None,
150 }
151 }
152
153 fn can_reconnect(&self) -> bool {
154 match self {
155 Self::Connected { .. }
156 | Self::HeartbeatMissed { .. }
157 | Self::ReconnectFailed { .. } => true,
158 State::Connecting
159 | State::Reconnecting
160 | State::ReconnectExhausted
161 | State::ServerNotRunning => false,
162 }
163 }
164
165 fn is_reconnect_failed(&self) -> bool {
166 matches!(self, Self::ReconnectFailed { .. })
167 }
168
169 fn is_reconnect_exhausted(&self) -> bool {
170 matches!(self, Self::ReconnectExhausted { .. })
171 }
172
173 fn is_server_not_running(&self) -> bool {
174 matches!(self, Self::ServerNotRunning)
175 }
176
177 fn is_reconnecting(&self) -> bool {
178 matches!(self, Self::Reconnecting { .. })
179 }
180
181 fn heartbeat_recovered(self) -> Self {
182 match self {
183 Self::HeartbeatMissed {
184 ssh_connection,
185 delegate,
186 multiplex_task,
187 heartbeat_task,
188 ..
189 } => Self::Connected {
190 remote_connection: ssh_connection,
191 delegate,
192 multiplex_task,
193 heartbeat_task,
194 },
195 _ => self,
196 }
197 }
198
199 fn heartbeat_missed(self) -> Self {
200 match self {
201 Self::Connected {
202 remote_connection: ssh_connection,
203 delegate,
204 multiplex_task,
205 heartbeat_task,
206 } => Self::HeartbeatMissed {
207 missed_heartbeats: 1,
208 ssh_connection,
209 delegate,
210 multiplex_task,
211 heartbeat_task,
212 },
213 Self::HeartbeatMissed {
214 missed_heartbeats,
215 ssh_connection,
216 delegate,
217 multiplex_task,
218 heartbeat_task,
219 } => Self::HeartbeatMissed {
220 missed_heartbeats: missed_heartbeats + 1,
221 ssh_connection,
222 delegate,
223 multiplex_task,
224 heartbeat_task,
225 },
226 _ => self,
227 }
228 }
229}
230
231/// The state of the ssh connection.
232#[derive(Clone, Copy, Debug, PartialEq, Eq)]
233pub enum ConnectionState {
234 Connecting,
235 Connected,
236 HeartbeatMissed,
237 Reconnecting,
238 Disconnected,
239}
240
241impl From<&State> for ConnectionState {
242 fn from(value: &State) -> Self {
243 match value {
244 State::Connecting => Self::Connecting,
245 State::Connected { .. } => Self::Connected,
246 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
247 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
248 State::ReconnectExhausted => Self::Disconnected,
249 State::ServerNotRunning => Self::Disconnected,
250 }
251 }
252}
253
254pub struct RemoteClient {
255 client: Arc<ChannelClient>,
256 unique_identifier: String,
257 connection_options: RemoteConnectionOptions,
258 path_style: PathStyle,
259 state: Option<State>,
260}
261
262#[derive(Debug)]
263pub enum RemoteClientEvent {
264 Disconnected,
265}
266
267impl EventEmitter<RemoteClientEvent> for RemoteClient {}
268
269/// Identifies the socket on the remote server so that reconnects
270/// can re-join the same project.
271pub enum ConnectionIdentifier {
272 Setup(u64),
273 Workspace(i64),
274}
275
276static NEXT_ID: AtomicU64 = AtomicU64::new(1);
277
278impl ConnectionIdentifier {
279 pub fn setup() -> Self {
280 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
281 }
282
283 // This string gets used in a socket name, and so must be relatively short.
284 // The total length of:
285 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
286 // Must be less than about 100 characters
287 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
288 // So our strings should be at most 20 characters or so.
289 fn to_string(&self, cx: &App) -> String {
290 let identifier_prefix = match ReleaseChannel::global(cx) {
291 ReleaseChannel::Stable => "".to_string(),
292 release_channel => format!("{}-", release_channel.dev_name()),
293 };
294 match self {
295 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
296 Self::Workspace(workspace_id) => {
297 format!("{identifier_prefix}workspace-{workspace_id}",)
298 }
299 }
300 }
301}
302
303pub async fn connect(
304 connection_options: RemoteConnectionOptions,
305 delegate: Arc<dyn RemoteClientDelegate>,
306 cx: &mut AsyncApp,
307) -> Result<Arc<dyn RemoteConnection>> {
308 cx.update(|cx| {
309 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
310 pool.connect(connection_options.clone(), delegate.clone(), cx)
311 })
312 })?
313 .await
314 .map_err(|e| e.cloned())
315}
316
317impl RemoteClient {
318 pub fn new(
319 unique_identifier: ConnectionIdentifier,
320 remote_connection: Arc<dyn RemoteConnection>,
321 cancellation: oneshot::Receiver<()>,
322 delegate: Arc<dyn RemoteClientDelegate>,
323 cx: &mut App,
324 ) -> Task<Result<Option<Entity<Self>>>> {
325 let unique_identifier = unique_identifier.to_string(cx);
326 cx.spawn(async move |cx| {
327 let success = Box::pin(async move {
328 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
329 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
330 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
331
332 let client = cx.update(|cx| {
333 ChannelClient::new(
334 incoming_rx,
335 outgoing_tx,
336 cx,
337 "client",
338 remote_connection.has_wsl_interop(),
339 )
340 })?;
341
342 let path_style = remote_connection.path_style();
343 let this = cx.new(|_| Self {
344 client: client.clone(),
345 unique_identifier: unique_identifier.clone(),
346 connection_options: remote_connection.connection_options(),
347 path_style,
348 state: Some(State::Connecting),
349 })?;
350
351 let io_task = remote_connection.start_proxy(
352 unique_identifier,
353 false,
354 incoming_tx,
355 outgoing_rx,
356 connection_activity_tx,
357 delegate.clone(),
358 cx,
359 );
360
361 let ready = client
362 .wait_for_remote_started()
363 .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
364 .await;
365 match ready {
366 Ok(Some(_)) => {}
367 Ok(None) => {
368 let mut error = "remote client exited before becoming ready".to_owned();
369 if let Some(status) = io_task.now_or_never() {
370 match status {
371 Ok(exit_code) => {
372 error.push_str(&format!(", exit_code={exit_code:?}"))
373 }
374 Err(e) => error.push_str(&format!(", error={e:?}")),
375 }
376 }
377 let error = anyhow::anyhow!("{error}");
378 log::error!("failed to establish connection: {}", error);
379 return Err(error);
380 }
381 Err(_) => {
382 let mut error =
383 "remote client did not become ready within the timeout".to_owned();
384 if let Some(status) = io_task.now_or_never() {
385 match status {
386 Ok(exit_code) => {
387 error.push_str(&format!(", exit_code={exit_code:?}"))
388 }
389 Err(e) => error.push_str(&format!(", error={e:?}")),
390 }
391 }
392 let error = anyhow::anyhow!("{error}");
393 log::error!("failed to establish connection: {}", error);
394 return Err(error);
395 }
396 }
397 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
398 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
399 log::error!("failed to establish connection: {}", error);
400 return Err(error);
401 }
402
403 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
404
405 this.update(cx, |this, _| {
406 this.state = Some(State::Connected {
407 remote_connection,
408 delegate,
409 multiplex_task,
410 heartbeat_task,
411 });
412 })?;
413
414 Ok(Some(this))
415 });
416
417 select! {
418 _ = cancellation.fuse() => {
419 Ok(None)
420 }
421 result = success.fuse() => result
422 }
423 })
424 }
425
426 pub fn proto_client_from_channels(
427 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
428 outgoing_tx: mpsc::UnboundedSender<Envelope>,
429 cx: &App,
430 name: &'static str,
431 has_wsl_interop: bool,
432 ) -> AnyProtoClient {
433 ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
434 }
435
436 pub fn shutdown_processes<T: RequestMessage>(
437 &mut self,
438 shutdown_request: Option<T>,
439 executor: BackgroundExecutor,
440 ) -> Option<impl Future<Output = ()> + use<T>> {
441 let state = self.state.take()?;
442 log::info!("shutting down ssh processes");
443
444 let State::Connected {
445 multiplex_task,
446 heartbeat_task,
447 remote_connection: ssh_connection,
448 delegate,
449 } = state
450 else {
451 return None;
452 };
453
454 let client = self.client.clone();
455
456 Some(async move {
457 if let Some(shutdown_request) = shutdown_request {
458 client.send(shutdown_request).log_err();
459 // We wait 50ms instead of waiting for a response, because
460 // waiting for a response would require us to wait on the main thread
461 // which we want to avoid in an `on_app_quit` callback.
462 executor.timer(Duration::from_millis(50)).await;
463 }
464
465 // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
466 // child of master_process.
467 drop(multiplex_task);
468 // Now drop the rest of state, which kills master process.
469 drop(heartbeat_task);
470 drop(ssh_connection);
471 drop(delegate);
472 })
473 }
474
475 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
476 let can_reconnect = self
477 .state
478 .as_ref()
479 .map(|state| state.can_reconnect())
480 .unwrap_or(false);
481 if !can_reconnect {
482 log::info!("aborting reconnect, because not in state that allows reconnecting");
483 let error = if let Some(state) = self.state.as_ref() {
484 format!("invalid state, cannot reconnect while in state {state}")
485 } else {
486 "no state set".to_string()
487 };
488 anyhow::bail!(error);
489 }
490
491 let state = self.state.take().unwrap();
492 let (attempts, remote_connection, delegate) = match state {
493 State::Connected {
494 remote_connection: ssh_connection,
495 delegate,
496 multiplex_task,
497 heartbeat_task,
498 }
499 | State::HeartbeatMissed {
500 ssh_connection,
501 delegate,
502 multiplex_task,
503 heartbeat_task,
504 ..
505 } => {
506 drop(multiplex_task);
507 drop(heartbeat_task);
508 (0, ssh_connection, delegate)
509 }
510 State::ReconnectFailed {
511 attempts,
512 ssh_connection,
513 delegate,
514 ..
515 } => (attempts, ssh_connection, delegate),
516 State::Connecting
517 | State::Reconnecting
518 | State::ReconnectExhausted
519 | State::ServerNotRunning => unreachable!(),
520 };
521
522 let attempts = attempts + 1;
523 if attempts > MAX_RECONNECT_ATTEMPTS {
524 log::error!(
525 "Failed to reconnect to after {} attempts, giving up",
526 MAX_RECONNECT_ATTEMPTS
527 );
528 self.set_state(State::ReconnectExhausted, cx);
529 return Ok(());
530 }
531
532 self.set_state(State::Reconnecting, cx);
533
534 log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
535
536 let unique_identifier = self.unique_identifier.clone();
537 let client = self.client.clone();
538 let reconnect_task = cx.spawn(async move |this, cx| {
539 macro_rules! failed {
540 ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr) => {
541 delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
542 return State::ReconnectFailed {
543 error: anyhow!($error),
544 attempts: $attempts,
545 ssh_connection: $ssh_connection,
546 delegate: $delegate,
547 };
548 };
549 }
550
551 if let Err(error) = remote_connection
552 .kill()
553 .await
554 .context("Failed to kill ssh process")
555 {
556 failed!(error, attempts, remote_connection, delegate);
557 };
558
559 let connection_options = remote_connection.connection_options();
560
561 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
562 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
563 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
564
565 let (ssh_connection, io_task) = match async {
566 let ssh_connection = cx
567 .update_global(|pool: &mut ConnectionPool, cx| {
568 pool.connect(connection_options, delegate.clone(), cx)
569 })?
570 .await
571 .map_err(|error| error.cloned())?;
572
573 let io_task = ssh_connection.start_proxy(
574 unique_identifier,
575 true,
576 incoming_tx,
577 outgoing_rx,
578 connection_activity_tx,
579 delegate.clone(),
580 cx,
581 );
582 anyhow::Ok((ssh_connection, io_task))
583 }
584 .await
585 {
586 Ok((ssh_connection, io_task)) => (ssh_connection, io_task),
587 Err(error) => {
588 failed!(error, attempts, remote_connection, delegate);
589 }
590 };
591
592 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
593 client.reconnect(incoming_rx, outgoing_tx, cx);
594
595 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
596 failed!(error, attempts, ssh_connection, delegate);
597 };
598
599 State::Connected {
600 remote_connection: ssh_connection,
601 delegate,
602 multiplex_task,
603 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
604 }
605 });
606
607 cx.spawn(async move |this, cx| {
608 let new_state = reconnect_task.await;
609 this.update(cx, |this, cx| {
610 this.try_set_state(cx, |old_state| {
611 if old_state.is_reconnecting() {
612 match &new_state {
613 State::Connecting
614 | State::Reconnecting
615 | State::HeartbeatMissed { .. }
616 | State::ServerNotRunning => {}
617 State::Connected { .. } => {
618 log::info!("Successfully reconnected");
619 }
620 State::ReconnectFailed {
621 error, attempts, ..
622 } => {
623 log::error!(
624 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
625 attempts,
626 error
627 );
628 }
629 State::ReconnectExhausted => {
630 log::error!("Reconnect attempt failed and all attempts exhausted");
631 }
632 }
633 Some(new_state)
634 } else {
635 None
636 }
637 });
638
639 if this.state_is(State::is_reconnect_failed) {
640 this.reconnect(cx)
641 } else if this.state_is(State::is_reconnect_exhausted) {
642 Ok(())
643 } else {
644 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
645 Ok(())
646 }
647 })
648 })
649 .detach_and_log_err(cx);
650
651 Ok(())
652 }
653
654 fn heartbeat(
655 this: WeakEntity<Self>,
656 mut connection_activity_rx: mpsc::Receiver<()>,
657 cx: &mut AsyncApp,
658 ) -> Task<Result<()>> {
659 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
660 return Task::ready(Err(anyhow!("SshRemoteClient lost")));
661 };
662
663 cx.spawn(async move |cx| {
664 let mut missed_heartbeats = 0;
665
666 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
667 futures::pin_mut!(keepalive_timer);
668
669 loop {
670 select_biased! {
671 result = connection_activity_rx.next().fuse() => {
672 if result.is_none() {
673 log::warn!("ssh heartbeat: connection activity channel has been dropped. stopping.");
674 return Ok(());
675 }
676
677 if missed_heartbeats != 0 {
678 missed_heartbeats = 0;
679 let _ =this.update(cx, |this, cx| {
680 this.handle_heartbeat_result(missed_heartbeats, cx)
681 })?;
682 }
683 }
684 _ = keepalive_timer => {
685 log::debug!("Sending heartbeat to server...");
686
687 let result = select_biased! {
688 _ = connection_activity_rx.next().fuse() => {
689 Ok(())
690 }
691 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
692 ping_result
693 }
694 };
695
696 if result.is_err() {
697 missed_heartbeats += 1;
698 log::warn!(
699 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
700 HEARTBEAT_TIMEOUT,
701 missed_heartbeats,
702 MAX_MISSED_HEARTBEATS
703 );
704 } else if missed_heartbeats != 0 {
705 missed_heartbeats = 0;
706 } else {
707 continue;
708 }
709
710 let result = this.update(cx, |this, cx| {
711 this.handle_heartbeat_result(missed_heartbeats, cx)
712 })?;
713 if result.is_break() {
714 return Ok(());
715 }
716 }
717 }
718
719 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
720 }
721 })
722 }
723
724 fn handle_heartbeat_result(
725 &mut self,
726 missed_heartbeats: usize,
727 cx: &mut Context<Self>,
728 ) -> ControlFlow<()> {
729 let state = self.state.take().unwrap();
730 let next_state = if missed_heartbeats > 0 {
731 state.heartbeat_missed()
732 } else {
733 state.heartbeat_recovered()
734 };
735
736 self.set_state(next_state, cx);
737
738 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
739 log::error!(
740 "Missed last {} heartbeats. Reconnecting...",
741 missed_heartbeats
742 );
743
744 self.reconnect(cx)
745 .context("failed to start reconnect process after missing heartbeats")
746 .log_err();
747 ControlFlow::Break(())
748 } else {
749 ControlFlow::Continue(())
750 }
751 }
752
753 fn monitor(
754 this: WeakEntity<Self>,
755 io_task: Task<Result<i32>>,
756 cx: &AsyncApp,
757 ) -> Task<Result<()>> {
758 cx.spawn(async move |cx| {
759 let result = io_task.await;
760
761 match result {
762 Ok(exit_code) => {
763 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
764 match error {
765 ProxyLaunchError::ServerNotRunning => {
766 log::error!("failed to reconnect because server is not running");
767 this.update(cx, |this, cx| {
768 this.set_state(State::ServerNotRunning, cx);
769 })?;
770 }
771 }
772 } else if exit_code > 0 {
773 log::error!("proxy process terminated unexpectedly");
774 this.update(cx, |this, cx| {
775 this.reconnect(cx).ok();
776 })?;
777 }
778 }
779 Err(error) => {
780 log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
781 this.update(cx, |this, cx| {
782 this.reconnect(cx).ok();
783 })?;
784 }
785 }
786
787 Ok(())
788 })
789 }
790
791 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
792 self.state.as_ref().is_some_and(check)
793 }
794
795 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
796 let new_state = self.state.as_ref().and_then(map);
797 if let Some(new_state) = new_state {
798 self.state.replace(new_state);
799 cx.notify();
800 }
801 }
802
803 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
804 log::info!("setting state to '{}'", &state);
805
806 let is_reconnect_exhausted = state.is_reconnect_exhausted();
807 let is_server_not_running = state.is_server_not_running();
808 self.state.replace(state);
809
810 if is_reconnect_exhausted || is_server_not_running {
811 cx.emit(RemoteClientEvent::Disconnected);
812 }
813 cx.notify();
814 }
815
816 pub fn shell(&self) -> Option<String> {
817 Some(self.remote_connection()?.shell())
818 }
819
820 pub fn default_system_shell(&self) -> Option<String> {
821 Some(self.remote_connection()?.default_system_shell())
822 }
823
824 pub fn shares_network_interface(&self) -> bool {
825 self.remote_connection()
826 .map_or(false, |connection| connection.shares_network_interface())
827 }
828
829 pub fn build_command(
830 &self,
831 program: Option<String>,
832 args: &[String],
833 env: &HashMap<String, String>,
834 working_dir: Option<String>,
835 port_forward: Option<(u16, String, u16)>,
836 ) -> Result<CommandTemplate> {
837 let Some(connection) = self.remote_connection() else {
838 return Err(anyhow!("no ssh connection"));
839 };
840 connection.build_command(program, args, env, working_dir, port_forward)
841 }
842
843 pub fn build_forward_ports_command(
844 &self,
845 forwards: Vec<(u16, String, u16)>,
846 ) -> Result<CommandTemplate> {
847 let Some(connection) = self.remote_connection() else {
848 return Err(anyhow!("no ssh connection"));
849 };
850 connection.build_forward_ports_command(forwards)
851 }
852
853 pub fn upload_directory(
854 &self,
855 src_path: PathBuf,
856 dest_path: RemotePathBuf,
857 cx: &App,
858 ) -> Task<Result<()>> {
859 let Some(connection) = self.remote_connection() else {
860 return Task::ready(Err(anyhow!("no ssh connection")));
861 };
862 connection.upload_directory(src_path, dest_path, cx)
863 }
864
865 pub fn proto_client(&self) -> AnyProtoClient {
866 self.client.clone().into()
867 }
868
869 pub fn connection_options(&self) -> RemoteConnectionOptions {
870 self.connection_options.clone()
871 }
872
873 pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
874 if let State::Connected {
875 remote_connection, ..
876 } = self.state.as_ref()?
877 {
878 Some(remote_connection.clone())
879 } else {
880 None
881 }
882 }
883
884 pub fn connection_state(&self) -> ConnectionState {
885 self.state
886 .as_ref()
887 .map(ConnectionState::from)
888 .unwrap_or(ConnectionState::Disconnected)
889 }
890
891 pub fn is_disconnected(&self) -> bool {
892 self.connection_state() == ConnectionState::Disconnected
893 }
894
895 pub fn path_style(&self) -> PathStyle {
896 self.path_style
897 }
898
899 #[cfg(any(test, feature = "test-support"))]
900 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
901 let opts = self.connection_options();
902 client_cx.spawn(async move |cx| {
903 let connection = cx
904 .update_global(|c: &mut ConnectionPool, _| {
905 if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
906 c.clone()
907 } else {
908 panic!("missing test connection")
909 }
910 })
911 .unwrap()
912 .await
913 .unwrap();
914
915 connection.simulate_disconnect(cx);
916 })
917 }
918
919 #[cfg(any(test, feature = "test-support"))]
920 pub fn fake_server(
921 client_cx: &mut gpui::TestAppContext,
922 server_cx: &mut gpui::TestAppContext,
923 ) -> (RemoteConnectionOptions, AnyProtoClient) {
924 use crate::transport::ssh::SshConnectionHost;
925
926 let port = client_cx
927 .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
928 let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
929 host: SshConnectionHost::from("<fake>".to_string()),
930 port: Some(port),
931 ..Default::default()
932 });
933 let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
934 let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
935 let server_client = server_cx
936 .update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server", false));
937 let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
938 connection_options: opts.clone(),
939 server_cx: fake::SendableCx::new(server_cx),
940 server_channel: server_client.clone(),
941 });
942
943 client_cx.update(|cx| {
944 cx.update_default_global(|c: &mut ConnectionPool, cx| {
945 c.connections.insert(
946 opts.clone(),
947 ConnectionPoolEntry::Connecting(
948 cx.background_spawn({
949 let connection = connection.clone();
950 async move { Ok(connection.clone()) }
951 })
952 .shared(),
953 ),
954 );
955 })
956 });
957
958 (opts, server_client.into())
959 }
960
961 #[cfg(any(test, feature = "test-support"))]
962 pub async fn fake_client(
963 opts: RemoteConnectionOptions,
964 client_cx: &mut gpui::TestAppContext,
965 ) -> Entity<Self> {
966 let (_tx, rx) = oneshot::channel();
967 let mut cx = client_cx.to_async();
968 let connection = connect(opts, Arc::new(fake::Delegate), &mut cx)
969 .await
970 .unwrap();
971 client_cx
972 .update(|cx| {
973 Self::new(
974 ConnectionIdentifier::setup(),
975 connection,
976 rx,
977 Arc::new(fake::Delegate),
978 cx,
979 )
980 })
981 .await
982 .unwrap()
983 .unwrap()
984 }
985
986 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
987 self.state
988 .as_ref()
989 .and_then(|state| state.remote_connection())
990 }
991}
992
993enum ConnectionPoolEntry {
994 Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
995 Connected(Weak<dyn RemoteConnection>),
996}
997
998#[derive(Default)]
999struct ConnectionPool {
1000 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1001}
1002
1003impl Global for ConnectionPool {}
1004
1005impl ConnectionPool {
1006 pub fn connect(
1007 &mut self,
1008 opts: RemoteConnectionOptions,
1009 delegate: Arc<dyn RemoteClientDelegate>,
1010 cx: &mut App,
1011 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1012 let connection = self.connections.get(&opts);
1013 match connection {
1014 Some(ConnectionPoolEntry::Connecting(task)) => {
1015 delegate.set_status(
1016 Some("Waiting for existing connection attempt"),
1017 &mut cx.to_async(),
1018 );
1019 return task.clone();
1020 }
1021 Some(ConnectionPoolEntry::Connected(ssh)) => {
1022 if let Some(ssh) = ssh.upgrade()
1023 && !ssh.has_been_killed()
1024 {
1025 return Task::ready(Ok(ssh)).shared();
1026 }
1027 self.connections.remove(&opts);
1028 }
1029 None => {}
1030 }
1031
1032 let task = cx
1033 .spawn({
1034 let opts = opts.clone();
1035 let delegate = delegate.clone();
1036 async move |cx| {
1037 let connection = match opts.clone() {
1038 RemoteConnectionOptions::Ssh(opts) => {
1039 SshRemoteConnection::new(opts, delegate, cx)
1040 .await
1041 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1042 }
1043 RemoteConnectionOptions::Wsl(opts) => {
1044 WslRemoteConnection::new(opts, delegate, cx)
1045 .await
1046 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1047 }
1048 RemoteConnectionOptions::Docker(opts) => {
1049 DockerExecConnection::new(opts, delegate, cx)
1050 .await
1051 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1052 }
1053 };
1054
1055 cx.update_global(|pool: &mut Self, _| {
1056 debug_assert!(matches!(
1057 pool.connections.get(&opts),
1058 Some(ConnectionPoolEntry::Connecting(_))
1059 ));
1060 match connection {
1061 Ok(connection) => {
1062 pool.connections.insert(
1063 opts.clone(),
1064 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1065 );
1066 Ok(connection)
1067 }
1068 Err(error) => {
1069 pool.connections.remove(&opts);
1070 Err(Arc::new(error))
1071 }
1072 }
1073 })?
1074 }
1075 })
1076 .shared();
1077
1078 self.connections
1079 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1080 task
1081 }
1082}
1083
1084#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1085pub enum RemoteConnectionOptions {
1086 Ssh(SshConnectionOptions),
1087 Wsl(WslConnectionOptions),
1088 Docker(DockerConnectionOptions),
1089}
1090
1091impl RemoteConnectionOptions {
1092 pub fn display_name(&self) -> String {
1093 match self {
1094 RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1095 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1096 RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1097 }
1098 }
1099}
1100
1101impl From<SshConnectionOptions> for RemoteConnectionOptions {
1102 fn from(opts: SshConnectionOptions) -> Self {
1103 RemoteConnectionOptions::Ssh(opts)
1104 }
1105}
1106
1107impl From<WslConnectionOptions> for RemoteConnectionOptions {
1108 fn from(opts: WslConnectionOptions) -> Self {
1109 RemoteConnectionOptions::Wsl(opts)
1110 }
1111}
1112
1113#[cfg(target_os = "windows")]
1114/// Open a wsl path (\\wsl.localhost\<distro>\path)
1115#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1116#[action(namespace = workspace, no_json, no_register)]
1117pub struct OpenWslPath {
1118 pub distro: WslConnectionOptions,
1119 pub paths: Vec<PathBuf>,
1120}
1121
1122#[async_trait(?Send)]
1123pub trait RemoteConnection: Send + Sync {
1124 fn start_proxy(
1125 &self,
1126 unique_identifier: String,
1127 reconnect: bool,
1128 incoming_tx: UnboundedSender<Envelope>,
1129 outgoing_rx: UnboundedReceiver<Envelope>,
1130 connection_activity_tx: Sender<()>,
1131 delegate: Arc<dyn RemoteClientDelegate>,
1132 cx: &mut AsyncApp,
1133 ) -> Task<Result<i32>>;
1134 fn upload_directory(
1135 &self,
1136 src_path: PathBuf,
1137 dest_path: RemotePathBuf,
1138 cx: &App,
1139 ) -> Task<Result<()>>;
1140 async fn kill(&self) -> Result<()>;
1141 fn has_been_killed(&self) -> bool;
1142 fn shares_network_interface(&self) -> bool {
1143 false
1144 }
1145 fn build_command(
1146 &self,
1147 program: Option<String>,
1148 args: &[String],
1149 env: &HashMap<String, String>,
1150 working_dir: Option<String>,
1151 port_forward: Option<(u16, String, u16)>,
1152 ) -> Result<CommandTemplate>;
1153 fn build_forward_ports_command(
1154 &self,
1155 forwards: Vec<(u16, String, u16)>,
1156 ) -> Result<CommandTemplate>;
1157 fn connection_options(&self) -> RemoteConnectionOptions;
1158 fn path_style(&self) -> PathStyle;
1159 fn shell(&self) -> String;
1160 fn default_system_shell(&self) -> String;
1161 fn has_wsl_interop(&self) -> bool;
1162
1163 #[cfg(any(test, feature = "test-support"))]
1164 fn simulate_disconnect(&self, _: &AsyncApp) {}
1165}
1166
1167type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1168
1169struct Signal<T> {
1170 tx: Mutex<Option<oneshot::Sender<T>>>,
1171 rx: Shared<Task<Option<T>>>,
1172}
1173
1174impl<T: Send + Clone + 'static> Signal<T> {
1175 pub fn new(cx: &App) -> Self {
1176 let (tx, rx) = oneshot::channel();
1177
1178 let task = cx
1179 .background_executor()
1180 .spawn(async move { rx.await.ok() })
1181 .shared();
1182
1183 Self {
1184 tx: Mutex::new(Some(tx)),
1185 rx: task,
1186 }
1187 }
1188
1189 fn set(&self, value: T) {
1190 if let Some(tx) = self.tx.lock().take() {
1191 let _ = tx.send(value);
1192 }
1193 }
1194
1195 fn wait(&self) -> Shared<Task<Option<T>>> {
1196 self.rx.clone()
1197 }
1198}
1199
1200struct ChannelClient {
1201 next_message_id: AtomicU32,
1202 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1203 buffer: Mutex<VecDeque<Envelope>>,
1204 response_channels: ResponseChannels,
1205 message_handlers: Mutex<ProtoMessageHandlerSet>,
1206 max_received: AtomicU32,
1207 name: &'static str,
1208 task: Mutex<Task<Result<()>>>,
1209 remote_started: Signal<()>,
1210 has_wsl_interop: bool,
1211}
1212
1213impl ChannelClient {
1214 fn new(
1215 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1216 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1217 cx: &App,
1218 name: &'static str,
1219 has_wsl_interop: bool,
1220 ) -> Arc<Self> {
1221 Arc::new_cyclic(|this| Self {
1222 outgoing_tx: Mutex::new(outgoing_tx),
1223 next_message_id: AtomicU32::new(0),
1224 max_received: AtomicU32::new(0),
1225 response_channels: ResponseChannels::default(),
1226 message_handlers: Default::default(),
1227 buffer: Mutex::new(VecDeque::new()),
1228 name,
1229 task: Mutex::new(Self::start_handling_messages(
1230 this.clone(),
1231 incoming_rx,
1232 &cx.to_async(),
1233 )),
1234 remote_started: Signal::new(cx),
1235 has_wsl_interop,
1236 })
1237 }
1238
1239 fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1240 self.remote_started.wait()
1241 }
1242
1243 fn start_handling_messages(
1244 this: Weak<Self>,
1245 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1246 cx: &AsyncApp,
1247 ) -> Task<Result<()>> {
1248 cx.spawn(async move |cx| {
1249 if let Some(this) = this.upgrade() {
1250 let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1251 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1252 };
1253
1254 let peer_id = PeerId { owner_id: 0, id: 0 };
1255 while let Some(incoming) = incoming_rx.next().await {
1256 let Some(this) = this.upgrade() else {
1257 return anyhow::Ok(());
1258 };
1259 if let Some(ack_id) = incoming.ack_id {
1260 let mut buffer = this.buffer.lock();
1261 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1262 buffer.pop_front();
1263 }
1264 }
1265 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1266 {
1267 log::debug!(
1268 "{}:ssh message received. name:FlushBufferedMessages",
1269 this.name
1270 );
1271 {
1272 let buffer = this.buffer.lock();
1273 for envelope in buffer.iter() {
1274 this.outgoing_tx
1275 .lock()
1276 .unbounded_send(envelope.clone())
1277 .ok();
1278 }
1279 }
1280 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1281 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1282 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1283 continue;
1284 }
1285
1286 if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1287 this.remote_started.set(());
1288 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1289 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1290 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1291 continue;
1292 }
1293
1294 this.max_received.store(incoming.id, SeqCst);
1295
1296 if let Some(request_id) = incoming.responding_to {
1297 let request_id = MessageId(request_id);
1298 let sender = this.response_channels.lock().remove(&request_id);
1299 if let Some(sender) = sender {
1300 let (tx, rx) = oneshot::channel();
1301 if incoming.payload.is_some() {
1302 sender.send((incoming, tx)).ok();
1303 }
1304 rx.await.ok();
1305 }
1306 } else if let Some(envelope) =
1307 build_typed_envelope(peer_id, Instant::now(), incoming)
1308 {
1309 let type_name = envelope.payload_type_name();
1310 let message_id = envelope.message_id();
1311 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1312 &this.message_handlers,
1313 envelope,
1314 this.clone().into(),
1315 cx.clone(),
1316 ) {
1317 log::debug!("{}:ssh message received. name:{type_name}", this.name);
1318 cx.foreground_executor()
1319 .spawn(async move {
1320 match future.await {
1321 Ok(_) => {
1322 log::debug!(
1323 "{}:ssh message handled. name:{type_name}",
1324 this.name
1325 );
1326 }
1327 Err(error) => {
1328 log::error!(
1329 "{}:error handling message. type:{}, error:{:#}",
1330 this.name,
1331 type_name,
1332 format!("{error:#}").lines().fold(
1333 String::new(),
1334 |mut message, line| {
1335 if !message.is_empty() {
1336 message.push(' ');
1337 }
1338 message.push_str(line);
1339 message
1340 }
1341 )
1342 );
1343 }
1344 }
1345 })
1346 .detach()
1347 } else {
1348 log::error!("{}:unhandled ssh message name:{type_name}", this.name);
1349 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1350 message_id,
1351 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1352 ) {
1353 log::error!(
1354 "{}:error sending error response for {type_name}:{e:#}",
1355 this.name
1356 );
1357 }
1358 }
1359 }
1360 }
1361 anyhow::Ok(())
1362 })
1363 }
1364
1365 fn reconnect(
1366 self: &Arc<Self>,
1367 incoming_rx: UnboundedReceiver<Envelope>,
1368 outgoing_tx: UnboundedSender<Envelope>,
1369 cx: &AsyncApp,
1370 ) {
1371 *self.outgoing_tx.lock() = outgoing_tx;
1372 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1373 }
1374
1375 fn request<T: RequestMessage>(
1376 &self,
1377 payload: T,
1378 ) -> impl 'static + Future<Output = Result<T::Response>> {
1379 self.request_internal(payload, true)
1380 }
1381
1382 fn request_internal<T: RequestMessage>(
1383 &self,
1384 payload: T,
1385 use_buffer: bool,
1386 ) -> impl 'static + Future<Output = Result<T::Response>> {
1387 log::debug!("ssh request start. name:{}", T::NAME);
1388 let response =
1389 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1390 async move {
1391 let response = response.await?;
1392 log::debug!("ssh request finish. name:{}", T::NAME);
1393 T::Response::from_envelope(response).context("received a response of the wrong type")
1394 }
1395 }
1396
1397 async fn resync(&self, timeout: Duration) -> Result<()> {
1398 smol::future::or(
1399 async {
1400 self.request_internal(proto::FlushBufferedMessages {}, false)
1401 .await?;
1402
1403 for envelope in self.buffer.lock().iter() {
1404 self.outgoing_tx
1405 .lock()
1406 .unbounded_send(envelope.clone())
1407 .ok();
1408 }
1409 Ok(())
1410 },
1411 async {
1412 smol::Timer::after(timeout).await;
1413 anyhow::bail!("Timed out resyncing remote client")
1414 },
1415 )
1416 .await
1417 }
1418
1419 async fn ping(&self, timeout: Duration) -> Result<()> {
1420 smol::future::or(
1421 async {
1422 self.request(proto::Ping {}).await?;
1423 Ok(())
1424 },
1425 async {
1426 smol::Timer::after(timeout).await;
1427 anyhow::bail!("Timed out pinging remote client")
1428 },
1429 )
1430 .await
1431 }
1432
1433 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1434 log::debug!("ssh send name:{}", T::NAME);
1435 self.send_dynamic(payload.into_envelope(0, None, None))
1436 }
1437
1438 fn request_dynamic(
1439 &self,
1440 mut envelope: proto::Envelope,
1441 type_name: &'static str,
1442 use_buffer: bool,
1443 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1444 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1445 let (tx, rx) = oneshot::channel();
1446 let mut response_channels_lock = self.response_channels.lock();
1447 response_channels_lock.insert(MessageId(envelope.id), tx);
1448 drop(response_channels_lock);
1449
1450 let result = if use_buffer {
1451 self.send_buffered(envelope)
1452 } else {
1453 self.send_unbuffered(envelope)
1454 };
1455 async move {
1456 if let Err(error) = &result {
1457 log::error!("failed to send message: {error}");
1458 anyhow::bail!("failed to send message: {error}");
1459 }
1460
1461 let response = rx.await.context("connection lost")?.0;
1462 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1463 return Err(RpcError::from_proto(error, type_name));
1464 }
1465 Ok(response)
1466 }
1467 }
1468
1469 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1470 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1471 self.send_buffered(envelope)
1472 }
1473
1474 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1475 envelope.ack_id = Some(self.max_received.load(SeqCst));
1476 self.buffer.lock().push_back(envelope.clone());
1477 // ignore errors on send (happen while we're reconnecting)
1478 // assume that the global "disconnected" overlay is sufficient.
1479 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1480 Ok(())
1481 }
1482
1483 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1484 envelope.ack_id = Some(self.max_received.load(SeqCst));
1485 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1486 Ok(())
1487 }
1488}
1489
1490impl ProtoClient for ChannelClient {
1491 fn request(
1492 &self,
1493 envelope: proto::Envelope,
1494 request_type: &'static str,
1495 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1496 self.request_dynamic(envelope, request_type, true).boxed()
1497 }
1498
1499 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1500 self.send_dynamic(envelope)
1501 }
1502
1503 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1504 self.send_dynamic(envelope)
1505 }
1506
1507 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1508 &self.message_handlers
1509 }
1510
1511 fn is_via_collab(&self) -> bool {
1512 false
1513 }
1514
1515 fn has_wsl_interop(&self) -> bool {
1516 self.has_wsl_interop
1517 }
1518}
1519
1520#[cfg(any(test, feature = "test-support"))]
1521mod fake {
1522 use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1523 use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1524 use anyhow::Result;
1525 use askpass::EncryptedPassword;
1526 use async_trait::async_trait;
1527 use collections::HashMap;
1528 use futures::{
1529 FutureExt, SinkExt, StreamExt,
1530 channel::{
1531 mpsc::{self, Sender},
1532 oneshot,
1533 },
1534 select_biased,
1535 };
1536 use gpui::{App, AppContext as _, AsyncApp, Task, TestAppContext};
1537 use release_channel::ReleaseChannel;
1538 use rpc::proto::Envelope;
1539 use semver::Version;
1540 use std::{path::PathBuf, sync::Arc};
1541 use util::paths::{PathStyle, RemotePathBuf};
1542
1543 pub(super) struct FakeRemoteConnection {
1544 pub(super) connection_options: RemoteConnectionOptions,
1545 pub(super) server_channel: Arc<ChannelClient>,
1546 pub(super) server_cx: SendableCx,
1547 }
1548
1549 pub(super) struct SendableCx(AsyncApp);
1550 impl SendableCx {
1551 // SAFETY: When run in test mode, GPUI is always single threaded.
1552 pub(super) fn new(cx: &TestAppContext) -> Self {
1553 Self(cx.to_async())
1554 }
1555
1556 // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1557 fn get(&self, _: &AsyncApp) -> AsyncApp {
1558 self.0.clone()
1559 }
1560 }
1561
1562 // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1563 unsafe impl Send for SendableCx {}
1564 unsafe impl Sync for SendableCx {}
1565
1566 #[async_trait(?Send)]
1567 impl RemoteConnection for FakeRemoteConnection {
1568 async fn kill(&self) -> Result<()> {
1569 Ok(())
1570 }
1571
1572 fn has_been_killed(&self) -> bool {
1573 false
1574 }
1575
1576 fn build_command(
1577 &self,
1578 program: Option<String>,
1579 args: &[String],
1580 env: &HashMap<String, String>,
1581 _: Option<String>,
1582 _: Option<(u16, String, u16)>,
1583 ) -> Result<CommandTemplate> {
1584 let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1585 let mut ssh_args = Vec::new();
1586 ssh_args.push(ssh_program);
1587 ssh_args.extend(args.iter().cloned());
1588 Ok(CommandTemplate {
1589 program: "ssh".into(),
1590 args: ssh_args,
1591 env: env.clone(),
1592 })
1593 }
1594
1595 fn build_forward_ports_command(
1596 &self,
1597 forwards: Vec<(u16, String, u16)>,
1598 ) -> anyhow::Result<CommandTemplate> {
1599 Ok(CommandTemplate {
1600 program: "ssh".into(),
1601 args: std::iter::once("-N".to_owned())
1602 .chain(forwards.into_iter().map(|(local_port, host, remote_port)| {
1603 format!("{local_port}:{host}:{remote_port}")
1604 }))
1605 .collect(),
1606 env: Default::default(),
1607 })
1608 }
1609
1610 fn upload_directory(
1611 &self,
1612 _src_path: PathBuf,
1613 _dest_path: RemotePathBuf,
1614 _cx: &App,
1615 ) -> Task<Result<()>> {
1616 unreachable!()
1617 }
1618
1619 fn connection_options(&self) -> RemoteConnectionOptions {
1620 self.connection_options.clone()
1621 }
1622
1623 fn simulate_disconnect(&self, cx: &AsyncApp) {
1624 let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1625 let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1626 self.server_channel
1627 .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1628 }
1629
1630 fn start_proxy(
1631 &self,
1632 _unique_identifier: String,
1633 _reconnect: bool,
1634 mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1635 mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1636 mut connection_activity_tx: Sender<()>,
1637 _delegate: Arc<dyn RemoteClientDelegate>,
1638 cx: &mut AsyncApp,
1639 ) -> Task<Result<i32>> {
1640 let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1641 let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1642
1643 self.server_channel.reconnect(
1644 server_incoming_rx,
1645 server_outgoing_tx,
1646 &self.server_cx.get(cx),
1647 );
1648
1649 cx.background_spawn(async move {
1650 loop {
1651 select_biased! {
1652 server_to_client = server_outgoing_rx.next().fuse() => {
1653 let Some(server_to_client) = server_to_client else {
1654 return Ok(1)
1655 };
1656 connection_activity_tx.try_send(()).ok();
1657 client_incoming_tx.send(server_to_client).await.ok();
1658 }
1659 client_to_server = client_outgoing_rx.next().fuse() => {
1660 let Some(client_to_server) = client_to_server else {
1661 return Ok(1)
1662 };
1663 server_incoming_tx.send(client_to_server).await.ok();
1664 }
1665 }
1666 }
1667 })
1668 }
1669
1670 fn path_style(&self) -> PathStyle {
1671 PathStyle::local()
1672 }
1673
1674 fn shell(&self) -> String {
1675 "sh".to_owned()
1676 }
1677
1678 fn default_system_shell(&self) -> String {
1679 "sh".to_owned()
1680 }
1681
1682 fn has_wsl_interop(&self) -> bool {
1683 false
1684 }
1685 }
1686
1687 pub(super) struct Delegate;
1688
1689 impl RemoteClientDelegate for Delegate {
1690 fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1691 unreachable!()
1692 }
1693
1694 fn download_server_binary_locally(
1695 &self,
1696 _: RemotePlatform,
1697 _: ReleaseChannel,
1698 _: Option<Version>,
1699 _: &mut AsyncApp,
1700 ) -> Task<Result<PathBuf>> {
1701 unreachable!()
1702 }
1703
1704 fn get_download_url(
1705 &self,
1706 _platform: RemotePlatform,
1707 _release_channel: ReleaseChannel,
1708 _version: Option<Version>,
1709 _cx: &mut AsyncApp,
1710 ) -> Task<Result<Option<String>>> {
1711 unreachable!()
1712 }
1713
1714 fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1715 }
1716}