1use crate::{
2 SshConnectionOptions,
3 protocol::MessageId,
4 proxy::ProxyLaunchError,
5 transport::{
6 ssh::SshRemoteConnection,
7 wsl::{WslConnectionOptions, WslRemoteConnection},
8 },
9};
10use anyhow::{Context as _, Result, anyhow};
11use askpass::EncryptedPassword;
12use async_trait::async_trait;
13use collections::HashMap;
14use futures::{
15 Future, FutureExt as _, StreamExt as _,
16 channel::{
17 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
18 oneshot,
19 },
20 future::{BoxFuture, Shared},
21 select, select_biased,
22};
23use gpui::{
24 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
25 EventEmitter, FutureExt, Global, Task, WeakEntity,
26};
27use parking_lot::Mutex;
28
29use release_channel::ReleaseChannel;
30use rpc::{
31 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
32 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
33};
34use semver::Version;
35use std::{
36 collections::VecDeque,
37 fmt,
38 ops::ControlFlow,
39 path::PathBuf,
40 sync::{
41 Arc, Weak,
42 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
43 },
44 time::{Duration, Instant},
45};
46use util::{
47 ResultExt,
48 paths::{PathStyle, RemotePathBuf},
49};
50
51#[derive(Copy, Clone, Debug)]
52pub struct RemotePlatform {
53 pub os: &'static str,
54 pub arch: &'static str,
55}
56
57#[derive(Clone, Debug)]
58pub struct CommandTemplate {
59 pub program: String,
60 pub args: Vec<String>,
61 pub env: HashMap<String, String>,
62}
63
64pub trait RemoteClientDelegate: Send + Sync {
65 fn ask_password(
66 &self,
67 prompt: String,
68 tx: oneshot::Sender<EncryptedPassword>,
69 cx: &mut AsyncApp,
70 );
71 fn get_download_url(
72 &self,
73 platform: RemotePlatform,
74 release_channel: ReleaseChannel,
75 version: Option<Version>,
76 cx: &mut AsyncApp,
77 ) -> Task<Result<Option<String>>>;
78 fn download_server_binary_locally(
79 &self,
80 platform: RemotePlatform,
81 release_channel: ReleaseChannel,
82 version: Option<Version>,
83 cx: &mut AsyncApp,
84 ) -> Task<Result<PathBuf>>;
85 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
86}
87
88const MAX_MISSED_HEARTBEATS: usize = 5;
89const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
90const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
91const INITIAL_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
92
93const MAX_RECONNECT_ATTEMPTS: usize = 3;
94
95enum State {
96 Connecting,
97 Connected {
98 remote_connection: Arc<dyn RemoteConnection>,
99 delegate: Arc<dyn RemoteClientDelegate>,
100
101 multiplex_task: Task<Result<()>>,
102 heartbeat_task: Task<Result<()>>,
103 },
104 HeartbeatMissed {
105 missed_heartbeats: usize,
106
107 ssh_connection: Arc<dyn RemoteConnection>,
108 delegate: Arc<dyn RemoteClientDelegate>,
109
110 multiplex_task: Task<Result<()>>,
111 heartbeat_task: Task<Result<()>>,
112 },
113 Reconnecting,
114 ReconnectFailed {
115 ssh_connection: Arc<dyn RemoteConnection>,
116 delegate: Arc<dyn RemoteClientDelegate>,
117
118 error: anyhow::Error,
119 attempts: usize,
120 },
121 ReconnectExhausted,
122 ServerNotRunning,
123}
124
125impl fmt::Display for State {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 match self {
128 Self::Connecting => write!(f, "connecting"),
129 Self::Connected { .. } => write!(f, "connected"),
130 Self::Reconnecting => write!(f, "reconnecting"),
131 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
132 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
133 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
134 Self::ServerNotRunning { .. } => write!(f, "server not running"),
135 }
136 }
137}
138
139impl State {
140 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
141 match self {
142 Self::Connected {
143 remote_connection: ssh_connection,
144 ..
145 } => Some(ssh_connection.clone()),
146 Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.clone()),
147 Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.clone()),
148 _ => None,
149 }
150 }
151
152 fn can_reconnect(&self) -> bool {
153 match self {
154 Self::Connected { .. }
155 | Self::HeartbeatMissed { .. }
156 | Self::ReconnectFailed { .. } => true,
157 State::Connecting
158 | State::Reconnecting
159 | State::ReconnectExhausted
160 | State::ServerNotRunning => false,
161 }
162 }
163
164 fn is_reconnect_failed(&self) -> bool {
165 matches!(self, Self::ReconnectFailed { .. })
166 }
167
168 fn is_reconnect_exhausted(&self) -> bool {
169 matches!(self, Self::ReconnectExhausted { .. })
170 }
171
172 fn is_server_not_running(&self) -> bool {
173 matches!(self, Self::ServerNotRunning)
174 }
175
176 fn is_reconnecting(&self) -> bool {
177 matches!(self, Self::Reconnecting { .. })
178 }
179
180 fn heartbeat_recovered(self) -> Self {
181 match self {
182 Self::HeartbeatMissed {
183 ssh_connection,
184 delegate,
185 multiplex_task,
186 heartbeat_task,
187 ..
188 } => Self::Connected {
189 remote_connection: ssh_connection,
190 delegate,
191 multiplex_task,
192 heartbeat_task,
193 },
194 _ => self,
195 }
196 }
197
198 fn heartbeat_missed(self) -> Self {
199 match self {
200 Self::Connected {
201 remote_connection: ssh_connection,
202 delegate,
203 multiplex_task,
204 heartbeat_task,
205 } => Self::HeartbeatMissed {
206 missed_heartbeats: 1,
207 ssh_connection,
208 delegate,
209 multiplex_task,
210 heartbeat_task,
211 },
212 Self::HeartbeatMissed {
213 missed_heartbeats,
214 ssh_connection,
215 delegate,
216 multiplex_task,
217 heartbeat_task,
218 } => Self::HeartbeatMissed {
219 missed_heartbeats: missed_heartbeats + 1,
220 ssh_connection,
221 delegate,
222 multiplex_task,
223 heartbeat_task,
224 },
225 _ => self,
226 }
227 }
228}
229
230/// The state of the ssh connection.
231#[derive(Clone, Copy, Debug, PartialEq, Eq)]
232pub enum ConnectionState {
233 Connecting,
234 Connected,
235 HeartbeatMissed,
236 Reconnecting,
237 Disconnected,
238}
239
240impl From<&State> for ConnectionState {
241 fn from(value: &State) -> Self {
242 match value {
243 State::Connecting => Self::Connecting,
244 State::Connected { .. } => Self::Connected,
245 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
246 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
247 State::ReconnectExhausted => Self::Disconnected,
248 State::ServerNotRunning => Self::Disconnected,
249 }
250 }
251}
252
253pub struct RemoteClient {
254 client: Arc<ChannelClient>,
255 unique_identifier: String,
256 connection_options: RemoteConnectionOptions,
257 path_style: PathStyle,
258 state: Option<State>,
259}
260
261#[derive(Debug)]
262pub enum RemoteClientEvent {
263 Disconnected,
264}
265
266impl EventEmitter<RemoteClientEvent> for RemoteClient {}
267
268/// Identifies the socket on the remote server so that reconnects
269/// can re-join the same project.
270pub enum ConnectionIdentifier {
271 Setup(u64),
272 Workspace(i64),
273}
274
275static NEXT_ID: AtomicU64 = AtomicU64::new(1);
276
277impl ConnectionIdentifier {
278 pub fn setup() -> Self {
279 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
280 }
281
282 // This string gets used in a socket name, and so must be relatively short.
283 // The total length of:
284 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
285 // Must be less than about 100 characters
286 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
287 // So our strings should be at most 20 characters or so.
288 fn to_string(&self, cx: &App) -> String {
289 let identifier_prefix = match ReleaseChannel::global(cx) {
290 ReleaseChannel::Stable => "".to_string(),
291 release_channel => format!("{}-", release_channel.dev_name()),
292 };
293 match self {
294 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
295 Self::Workspace(workspace_id) => {
296 format!("{identifier_prefix}workspace-{workspace_id}",)
297 }
298 }
299 }
300}
301
302pub async fn connect(
303 connection_options: RemoteConnectionOptions,
304 delegate: Arc<dyn RemoteClientDelegate>,
305 cx: &mut AsyncApp,
306) -> Result<Arc<dyn RemoteConnection>> {
307 cx.update(|cx| {
308 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
309 pool.connect(connection_options.clone(), delegate.clone(), cx)
310 })
311 })?
312 .await
313 .map_err(|e| e.cloned())
314}
315
316impl RemoteClient {
317 pub fn new(
318 unique_identifier: ConnectionIdentifier,
319 remote_connection: Arc<dyn RemoteConnection>,
320 cancellation: oneshot::Receiver<()>,
321 delegate: Arc<dyn RemoteClientDelegate>,
322 cx: &mut App,
323 ) -> Task<Result<Option<Entity<Self>>>> {
324 let unique_identifier = unique_identifier.to_string(cx);
325 cx.spawn(async move |cx| {
326 let success = Box::pin(async move {
327 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
328 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
329 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
330
331 let client = cx.update(|cx| {
332 ChannelClient::new(
333 incoming_rx,
334 outgoing_tx,
335 cx,
336 "client",
337 remote_connection.has_wsl_interop(),
338 )
339 })?;
340
341 let path_style = remote_connection.path_style();
342 let this = cx.new(|_| Self {
343 client: client.clone(),
344 unique_identifier: unique_identifier.clone(),
345 connection_options: remote_connection.connection_options(),
346 path_style,
347 state: Some(State::Connecting),
348 })?;
349
350 let io_task = remote_connection.start_proxy(
351 unique_identifier,
352 false,
353 incoming_tx,
354 outgoing_rx,
355 connection_activity_tx,
356 delegate.clone(),
357 cx,
358 );
359
360 let ready = client
361 .wait_for_remote_started()
362 .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
363 .await;
364 match ready {
365 Ok(Some(_)) => {}
366 Ok(None) => {
367 let mut error = "remote client exited before becoming ready".to_owned();
368 if let Some(status) = io_task.now_or_never() {
369 match status {
370 Ok(exit_code) => {
371 error.push_str(&format!(", exit_code={exit_code:?}"))
372 }
373 Err(e) => error.push_str(&format!(", error={e:?}")),
374 }
375 }
376 let error = anyhow::anyhow!("{error}");
377 log::error!("failed to establish connection: {}", error);
378 return Err(error);
379 }
380 Err(_) => {
381 let mut error =
382 "remote client did not become ready within the timeout".to_owned();
383 if let Some(status) = io_task.now_or_never() {
384 match status {
385 Ok(exit_code) => {
386 error.push_str(&format!(", exit_code={exit_code:?}"))
387 }
388 Err(e) => error.push_str(&format!(", error={e:?}")),
389 }
390 }
391 let error = anyhow::anyhow!("{error}");
392 log::error!("failed to establish connection: {}", error);
393 return Err(error);
394 }
395 }
396 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
397 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
398 log::error!("failed to establish connection: {}", error);
399 return Err(error);
400 }
401
402 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
403
404 this.update(cx, |this, _| {
405 this.state = Some(State::Connected {
406 remote_connection,
407 delegate,
408 multiplex_task,
409 heartbeat_task,
410 });
411 })?;
412
413 Ok(Some(this))
414 });
415
416 select! {
417 _ = cancellation.fuse() => {
418 Ok(None)
419 }
420 result = success.fuse() => result
421 }
422 })
423 }
424
425 pub fn proto_client_from_channels(
426 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
427 outgoing_tx: mpsc::UnboundedSender<Envelope>,
428 cx: &App,
429 name: &'static str,
430 has_wsl_interop: bool,
431 ) -> AnyProtoClient {
432 ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
433 }
434
435 pub fn shutdown_processes<T: RequestMessage>(
436 &mut self,
437 shutdown_request: Option<T>,
438 executor: BackgroundExecutor,
439 ) -> Option<impl Future<Output = ()> + use<T>> {
440 let state = self.state.take()?;
441 log::info!("shutting down ssh processes");
442
443 let State::Connected {
444 multiplex_task,
445 heartbeat_task,
446 remote_connection: ssh_connection,
447 delegate,
448 } = state
449 else {
450 return None;
451 };
452
453 let client = self.client.clone();
454
455 Some(async move {
456 if let Some(shutdown_request) = shutdown_request {
457 client.send(shutdown_request).log_err();
458 // We wait 50ms instead of waiting for a response, because
459 // waiting for a response would require us to wait on the main thread
460 // which we want to avoid in an `on_app_quit` callback.
461 executor.timer(Duration::from_millis(50)).await;
462 }
463
464 // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
465 // child of master_process.
466 drop(multiplex_task);
467 // Now drop the rest of state, which kills master process.
468 drop(heartbeat_task);
469 drop(ssh_connection);
470 drop(delegate);
471 })
472 }
473
474 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
475 let can_reconnect = self
476 .state
477 .as_ref()
478 .map(|state| state.can_reconnect())
479 .unwrap_or(false);
480 if !can_reconnect {
481 log::info!("aborting reconnect, because not in state that allows reconnecting");
482 let error = if let Some(state) = self.state.as_ref() {
483 format!("invalid state, cannot reconnect while in state {state}")
484 } else {
485 "no state set".to_string()
486 };
487 anyhow::bail!(error);
488 }
489
490 let state = self.state.take().unwrap();
491 let (attempts, remote_connection, delegate) = match state {
492 State::Connected {
493 remote_connection: ssh_connection,
494 delegate,
495 multiplex_task,
496 heartbeat_task,
497 }
498 | State::HeartbeatMissed {
499 ssh_connection,
500 delegate,
501 multiplex_task,
502 heartbeat_task,
503 ..
504 } => {
505 drop(multiplex_task);
506 drop(heartbeat_task);
507 (0, ssh_connection, delegate)
508 }
509 State::ReconnectFailed {
510 attempts,
511 ssh_connection,
512 delegate,
513 ..
514 } => (attempts, ssh_connection, delegate),
515 State::Connecting
516 | State::Reconnecting
517 | State::ReconnectExhausted
518 | State::ServerNotRunning => unreachable!(),
519 };
520
521 let attempts = attempts + 1;
522 if attempts > MAX_RECONNECT_ATTEMPTS {
523 log::error!(
524 "Failed to reconnect to after {} attempts, giving up",
525 MAX_RECONNECT_ATTEMPTS
526 );
527 self.set_state(State::ReconnectExhausted, cx);
528 return Ok(());
529 }
530
531 self.set_state(State::Reconnecting, cx);
532
533 log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
534
535 let unique_identifier = self.unique_identifier.clone();
536 let client = self.client.clone();
537 let reconnect_task = cx.spawn(async move |this, cx| {
538 macro_rules! failed {
539 ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr) => {
540 delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
541 return State::ReconnectFailed {
542 error: anyhow!($error),
543 attempts: $attempts,
544 ssh_connection: $ssh_connection,
545 delegate: $delegate,
546 };
547 };
548 }
549
550 if let Err(error) = remote_connection
551 .kill()
552 .await
553 .context("Failed to kill ssh process")
554 {
555 failed!(error, attempts, remote_connection, delegate);
556 };
557
558 let connection_options = remote_connection.connection_options();
559
560 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
561 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
562 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
563
564 let (ssh_connection, io_task) = match async {
565 let ssh_connection = cx
566 .update_global(|pool: &mut ConnectionPool, cx| {
567 pool.connect(connection_options, delegate.clone(), cx)
568 })?
569 .await
570 .map_err(|error| error.cloned())?;
571
572 let io_task = ssh_connection.start_proxy(
573 unique_identifier,
574 true,
575 incoming_tx,
576 outgoing_rx,
577 connection_activity_tx,
578 delegate.clone(),
579 cx,
580 );
581 anyhow::Ok((ssh_connection, io_task))
582 }
583 .await
584 {
585 Ok((ssh_connection, io_task)) => (ssh_connection, io_task),
586 Err(error) => {
587 failed!(error, attempts, remote_connection, delegate);
588 }
589 };
590
591 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
592 client.reconnect(incoming_rx, outgoing_tx, cx);
593
594 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
595 failed!(error, attempts, ssh_connection, delegate);
596 };
597
598 State::Connected {
599 remote_connection: ssh_connection,
600 delegate,
601 multiplex_task,
602 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
603 }
604 });
605
606 cx.spawn(async move |this, cx| {
607 let new_state = reconnect_task.await;
608 this.update(cx, |this, cx| {
609 this.try_set_state(cx, |old_state| {
610 if old_state.is_reconnecting() {
611 match &new_state {
612 State::Connecting
613 | State::Reconnecting
614 | State::HeartbeatMissed { .. }
615 | State::ServerNotRunning => {}
616 State::Connected { .. } => {
617 log::info!("Successfully reconnected");
618 }
619 State::ReconnectFailed {
620 error, attempts, ..
621 } => {
622 log::error!(
623 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
624 attempts,
625 error
626 );
627 }
628 State::ReconnectExhausted => {
629 log::error!("Reconnect attempt failed and all attempts exhausted");
630 }
631 }
632 Some(new_state)
633 } else {
634 None
635 }
636 });
637
638 if this.state_is(State::is_reconnect_failed) {
639 this.reconnect(cx)
640 } else if this.state_is(State::is_reconnect_exhausted) {
641 Ok(())
642 } else {
643 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
644 Ok(())
645 }
646 })
647 })
648 .detach_and_log_err(cx);
649
650 Ok(())
651 }
652
653 fn heartbeat(
654 this: WeakEntity<Self>,
655 mut connection_activity_rx: mpsc::Receiver<()>,
656 cx: &mut AsyncApp,
657 ) -> Task<Result<()>> {
658 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
659 return Task::ready(Err(anyhow!("SshRemoteClient lost")));
660 };
661
662 cx.spawn(async move |cx| {
663 let mut missed_heartbeats = 0;
664
665 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
666 futures::pin_mut!(keepalive_timer);
667
668 loop {
669 select_biased! {
670 result = connection_activity_rx.next().fuse() => {
671 if result.is_none() {
672 log::warn!("ssh heartbeat: connection activity channel has been dropped. stopping.");
673 return Ok(());
674 }
675
676 if missed_heartbeats != 0 {
677 missed_heartbeats = 0;
678 let _ =this.update(cx, |this, cx| {
679 this.handle_heartbeat_result(missed_heartbeats, cx)
680 })?;
681 }
682 }
683 _ = keepalive_timer => {
684 log::debug!("Sending heartbeat to server...");
685
686 let result = select_biased! {
687 _ = connection_activity_rx.next().fuse() => {
688 Ok(())
689 }
690 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
691 ping_result
692 }
693 };
694
695 if result.is_err() {
696 missed_heartbeats += 1;
697 log::warn!(
698 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
699 HEARTBEAT_TIMEOUT,
700 missed_heartbeats,
701 MAX_MISSED_HEARTBEATS
702 );
703 } else if missed_heartbeats != 0 {
704 missed_heartbeats = 0;
705 } else {
706 continue;
707 }
708
709 let result = this.update(cx, |this, cx| {
710 this.handle_heartbeat_result(missed_heartbeats, cx)
711 })?;
712 if result.is_break() {
713 return Ok(());
714 }
715 }
716 }
717
718 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
719 }
720 })
721 }
722
723 fn handle_heartbeat_result(
724 &mut self,
725 missed_heartbeats: usize,
726 cx: &mut Context<Self>,
727 ) -> ControlFlow<()> {
728 let state = self.state.take().unwrap();
729 let next_state = if missed_heartbeats > 0 {
730 state.heartbeat_missed()
731 } else {
732 state.heartbeat_recovered()
733 };
734
735 self.set_state(next_state, cx);
736
737 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
738 log::error!(
739 "Missed last {} heartbeats. Reconnecting...",
740 missed_heartbeats
741 );
742
743 self.reconnect(cx)
744 .context("failed to start reconnect process after missing heartbeats")
745 .log_err();
746 ControlFlow::Break(())
747 } else {
748 ControlFlow::Continue(())
749 }
750 }
751
752 fn monitor(
753 this: WeakEntity<Self>,
754 io_task: Task<Result<i32>>,
755 cx: &AsyncApp,
756 ) -> Task<Result<()>> {
757 cx.spawn(async move |cx| {
758 let result = io_task.await;
759
760 match result {
761 Ok(exit_code) => {
762 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
763 match error {
764 ProxyLaunchError::ServerNotRunning => {
765 log::error!("failed to reconnect because server is not running");
766 this.update(cx, |this, cx| {
767 this.set_state(State::ServerNotRunning, cx);
768 })?;
769 }
770 }
771 } else if exit_code > 0 {
772 log::error!("proxy process terminated unexpectedly");
773 this.update(cx, |this, cx| {
774 this.reconnect(cx).ok();
775 })?;
776 }
777 }
778 Err(error) => {
779 log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
780 this.update(cx, |this, cx| {
781 this.reconnect(cx).ok();
782 })?;
783 }
784 }
785
786 Ok(())
787 })
788 }
789
790 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
791 self.state.as_ref().is_some_and(check)
792 }
793
794 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
795 let new_state = self.state.as_ref().and_then(map);
796 if let Some(new_state) = new_state {
797 self.state.replace(new_state);
798 cx.notify();
799 }
800 }
801
802 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
803 log::info!("setting state to '{}'", &state);
804
805 let is_reconnect_exhausted = state.is_reconnect_exhausted();
806 let is_server_not_running = state.is_server_not_running();
807 self.state.replace(state);
808
809 if is_reconnect_exhausted || is_server_not_running {
810 cx.emit(RemoteClientEvent::Disconnected);
811 }
812 cx.notify();
813 }
814
815 pub fn shell(&self) -> Option<String> {
816 Some(self.remote_connection()?.shell())
817 }
818
819 pub fn default_system_shell(&self) -> Option<String> {
820 Some(self.remote_connection()?.default_system_shell())
821 }
822
823 pub fn shares_network_interface(&self) -> bool {
824 self.remote_connection()
825 .map_or(false, |connection| connection.shares_network_interface())
826 }
827
828 pub fn build_command(
829 &self,
830 program: Option<String>,
831 args: &[String],
832 env: &HashMap<String, String>,
833 working_dir: Option<String>,
834 port_forward: Option<(u16, String, u16)>,
835 ) -> Result<CommandTemplate> {
836 let Some(connection) = self.remote_connection() else {
837 return Err(anyhow!("no ssh connection"));
838 };
839 connection.build_command(program, args, env, working_dir, port_forward)
840 }
841
842 pub fn build_forward_ports_command(
843 &self,
844 forwards: Vec<(u16, String, u16)>,
845 ) -> Result<CommandTemplate> {
846 let Some(connection) = self.remote_connection() else {
847 return Err(anyhow!("no ssh connection"));
848 };
849 connection.build_forward_ports_command(forwards)
850 }
851
852 pub fn upload_directory(
853 &self,
854 src_path: PathBuf,
855 dest_path: RemotePathBuf,
856 cx: &App,
857 ) -> Task<Result<()>> {
858 let Some(connection) = self.remote_connection() else {
859 return Task::ready(Err(anyhow!("no ssh connection")));
860 };
861 connection.upload_directory(src_path, dest_path, cx)
862 }
863
864 pub fn proto_client(&self) -> AnyProtoClient {
865 self.client.clone().into()
866 }
867
868 pub fn connection_options(&self) -> RemoteConnectionOptions {
869 self.connection_options.clone()
870 }
871
872 pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
873 if let State::Connected {
874 remote_connection, ..
875 } = self.state.as_ref()?
876 {
877 Some(remote_connection.clone())
878 } else {
879 None
880 }
881 }
882
883 pub fn connection_state(&self) -> ConnectionState {
884 self.state
885 .as_ref()
886 .map(ConnectionState::from)
887 .unwrap_or(ConnectionState::Disconnected)
888 }
889
890 pub fn is_disconnected(&self) -> bool {
891 self.connection_state() == ConnectionState::Disconnected
892 }
893
894 pub fn path_style(&self) -> PathStyle {
895 self.path_style
896 }
897
898 #[cfg(any(test, feature = "test-support"))]
899 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
900 let opts = self.connection_options();
901 client_cx.spawn(async move |cx| {
902 let connection = cx
903 .update_global(|c: &mut ConnectionPool, _| {
904 if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
905 c.clone()
906 } else {
907 panic!("missing test connection")
908 }
909 })
910 .unwrap()
911 .await
912 .unwrap();
913
914 connection.simulate_disconnect(cx);
915 })
916 }
917
918 #[cfg(any(test, feature = "test-support"))]
919 pub fn fake_server(
920 client_cx: &mut gpui::TestAppContext,
921 server_cx: &mut gpui::TestAppContext,
922 ) -> (RemoteConnectionOptions, AnyProtoClient) {
923 let port = client_cx
924 .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
925 let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
926 host: "<fake>".to_string(),
927 port: Some(port),
928 ..Default::default()
929 });
930 let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
931 let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
932 let server_client = server_cx
933 .update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server", false));
934 let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
935 connection_options: opts.clone(),
936 server_cx: fake::SendableCx::new(server_cx),
937 server_channel: server_client.clone(),
938 });
939
940 client_cx.update(|cx| {
941 cx.update_default_global(|c: &mut ConnectionPool, cx| {
942 c.connections.insert(
943 opts.clone(),
944 ConnectionPoolEntry::Connecting(
945 cx.background_spawn({
946 let connection = connection.clone();
947 async move { Ok(connection.clone()) }
948 })
949 .shared(),
950 ),
951 );
952 })
953 });
954
955 (opts, server_client.into())
956 }
957
958 #[cfg(any(test, feature = "test-support"))]
959 pub async fn fake_client(
960 opts: RemoteConnectionOptions,
961 client_cx: &mut gpui::TestAppContext,
962 ) -> Entity<Self> {
963 let (_tx, rx) = oneshot::channel();
964 let mut cx = client_cx.to_async();
965 let connection = connect(opts, Arc::new(fake::Delegate), &mut cx)
966 .await
967 .unwrap();
968 client_cx
969 .update(|cx| {
970 Self::new(
971 ConnectionIdentifier::setup(),
972 connection,
973 rx,
974 Arc::new(fake::Delegate),
975 cx,
976 )
977 })
978 .await
979 .unwrap()
980 .unwrap()
981 }
982
983 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
984 self.state
985 .as_ref()
986 .and_then(|state| state.remote_connection())
987 }
988}
989
990enum ConnectionPoolEntry {
991 Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
992 Connected(Weak<dyn RemoteConnection>),
993}
994
995#[derive(Default)]
996struct ConnectionPool {
997 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
998}
999
1000impl Global for ConnectionPool {}
1001
1002impl ConnectionPool {
1003 pub fn connect(
1004 &mut self,
1005 opts: RemoteConnectionOptions,
1006 delegate: Arc<dyn RemoteClientDelegate>,
1007 cx: &mut App,
1008 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1009 let connection = self.connections.get(&opts);
1010 match connection {
1011 Some(ConnectionPoolEntry::Connecting(task)) => {
1012 delegate.set_status(
1013 Some("Waiting for existing connection attempt"),
1014 &mut cx.to_async(),
1015 );
1016 return task.clone();
1017 }
1018 Some(ConnectionPoolEntry::Connected(ssh)) => {
1019 if let Some(ssh) = ssh.upgrade()
1020 && !ssh.has_been_killed()
1021 {
1022 return Task::ready(Ok(ssh)).shared();
1023 }
1024 self.connections.remove(&opts);
1025 }
1026 None => {}
1027 }
1028
1029 let task = cx
1030 .spawn({
1031 let opts = opts.clone();
1032 let delegate = delegate.clone();
1033 async move |cx| {
1034 let connection = match opts.clone() {
1035 RemoteConnectionOptions::Ssh(opts) => {
1036 SshRemoteConnection::new(opts, delegate, cx)
1037 .await
1038 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1039 }
1040 RemoteConnectionOptions::Wsl(opts) => {
1041 WslRemoteConnection::new(opts, delegate, cx)
1042 .await
1043 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1044 }
1045 };
1046
1047 cx.update_global(|pool: &mut Self, _| {
1048 debug_assert!(matches!(
1049 pool.connections.get(&opts),
1050 Some(ConnectionPoolEntry::Connecting(_))
1051 ));
1052 match connection {
1053 Ok(connection) => {
1054 pool.connections.insert(
1055 opts.clone(),
1056 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1057 );
1058 Ok(connection)
1059 }
1060 Err(error) => {
1061 pool.connections.remove(&opts);
1062 Err(Arc::new(error))
1063 }
1064 }
1065 })?
1066 }
1067 })
1068 .shared();
1069
1070 self.connections
1071 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1072 task
1073 }
1074}
1075
1076#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1077pub enum RemoteConnectionOptions {
1078 Ssh(SshConnectionOptions),
1079 Wsl(WslConnectionOptions),
1080}
1081
1082impl RemoteConnectionOptions {
1083 pub fn display_name(&self) -> String {
1084 match self {
1085 RemoteConnectionOptions::Ssh(opts) => opts.host.clone(),
1086 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1087 }
1088 }
1089}
1090
1091impl From<SshConnectionOptions> for RemoteConnectionOptions {
1092 fn from(opts: SshConnectionOptions) -> Self {
1093 RemoteConnectionOptions::Ssh(opts)
1094 }
1095}
1096
1097impl From<WslConnectionOptions> for RemoteConnectionOptions {
1098 fn from(opts: WslConnectionOptions) -> Self {
1099 RemoteConnectionOptions::Wsl(opts)
1100 }
1101}
1102
1103#[cfg(target_os = "windows")]
1104/// Open a wsl path (\\wsl.localhost\<distro>\path)
1105#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1106#[action(namespace = workspace, no_json, no_register)]
1107pub struct OpenWslPath {
1108 pub distro: WslConnectionOptions,
1109 pub paths: Vec<PathBuf>,
1110}
1111
1112#[async_trait(?Send)]
1113pub trait RemoteConnection: Send + Sync {
1114 fn start_proxy(
1115 &self,
1116 unique_identifier: String,
1117 reconnect: bool,
1118 incoming_tx: UnboundedSender<Envelope>,
1119 outgoing_rx: UnboundedReceiver<Envelope>,
1120 connection_activity_tx: Sender<()>,
1121 delegate: Arc<dyn RemoteClientDelegate>,
1122 cx: &mut AsyncApp,
1123 ) -> Task<Result<i32>>;
1124 fn upload_directory(
1125 &self,
1126 src_path: PathBuf,
1127 dest_path: RemotePathBuf,
1128 cx: &App,
1129 ) -> Task<Result<()>>;
1130 async fn kill(&self) -> Result<()>;
1131 fn has_been_killed(&self) -> bool;
1132 fn shares_network_interface(&self) -> bool {
1133 false
1134 }
1135 fn build_command(
1136 &self,
1137 program: Option<String>,
1138 args: &[String],
1139 env: &HashMap<String, String>,
1140 working_dir: Option<String>,
1141 port_forward: Option<(u16, String, u16)>,
1142 ) -> Result<CommandTemplate>;
1143 fn build_forward_ports_command(
1144 &self,
1145 forwards: Vec<(u16, String, u16)>,
1146 ) -> Result<CommandTemplate>;
1147 fn connection_options(&self) -> RemoteConnectionOptions;
1148 fn path_style(&self) -> PathStyle;
1149 fn shell(&self) -> String;
1150 fn default_system_shell(&self) -> String;
1151 fn has_wsl_interop(&self) -> bool;
1152
1153 #[cfg(any(test, feature = "test-support"))]
1154 fn simulate_disconnect(&self, _: &AsyncApp) {}
1155}
1156
1157type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1158
1159struct Signal<T> {
1160 tx: Mutex<Option<oneshot::Sender<T>>>,
1161 rx: Shared<Task<Option<T>>>,
1162}
1163
1164impl<T: Send + Clone + 'static> Signal<T> {
1165 pub fn new(cx: &App) -> Self {
1166 let (tx, rx) = oneshot::channel();
1167
1168 let task = cx
1169 .background_executor()
1170 .spawn(async move { rx.await.ok() })
1171 .shared();
1172
1173 Self {
1174 tx: Mutex::new(Some(tx)),
1175 rx: task,
1176 }
1177 }
1178
1179 fn set(&self, value: T) {
1180 if let Some(tx) = self.tx.lock().take() {
1181 let _ = tx.send(value);
1182 }
1183 }
1184
1185 fn wait(&self) -> Shared<Task<Option<T>>> {
1186 self.rx.clone()
1187 }
1188}
1189
1190struct ChannelClient {
1191 next_message_id: AtomicU32,
1192 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1193 buffer: Mutex<VecDeque<Envelope>>,
1194 response_channels: ResponseChannels,
1195 message_handlers: Mutex<ProtoMessageHandlerSet>,
1196 max_received: AtomicU32,
1197 name: &'static str,
1198 task: Mutex<Task<Result<()>>>,
1199 remote_started: Signal<()>,
1200 has_wsl_interop: bool,
1201}
1202
1203impl ChannelClient {
1204 fn new(
1205 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1206 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1207 cx: &App,
1208 name: &'static str,
1209 has_wsl_interop: bool,
1210 ) -> Arc<Self> {
1211 Arc::new_cyclic(|this| Self {
1212 outgoing_tx: Mutex::new(outgoing_tx),
1213 next_message_id: AtomicU32::new(0),
1214 max_received: AtomicU32::new(0),
1215 response_channels: ResponseChannels::default(),
1216 message_handlers: Default::default(),
1217 buffer: Mutex::new(VecDeque::new()),
1218 name,
1219 task: Mutex::new(Self::start_handling_messages(
1220 this.clone(),
1221 incoming_rx,
1222 &cx.to_async(),
1223 )),
1224 remote_started: Signal::new(cx),
1225 has_wsl_interop,
1226 })
1227 }
1228
1229 fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1230 self.remote_started.wait()
1231 }
1232
1233 fn start_handling_messages(
1234 this: Weak<Self>,
1235 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1236 cx: &AsyncApp,
1237 ) -> Task<Result<()>> {
1238 cx.spawn(async move |cx| {
1239 if let Some(this) = this.upgrade() {
1240 let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1241 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1242 };
1243
1244 let peer_id = PeerId { owner_id: 0, id: 0 };
1245 while let Some(incoming) = incoming_rx.next().await {
1246 let Some(this) = this.upgrade() else {
1247 return anyhow::Ok(());
1248 };
1249 if let Some(ack_id) = incoming.ack_id {
1250 let mut buffer = this.buffer.lock();
1251 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1252 buffer.pop_front();
1253 }
1254 }
1255 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1256 {
1257 log::debug!(
1258 "{}:ssh message received. name:FlushBufferedMessages",
1259 this.name
1260 );
1261 {
1262 let buffer = this.buffer.lock();
1263 for envelope in buffer.iter() {
1264 this.outgoing_tx
1265 .lock()
1266 .unbounded_send(envelope.clone())
1267 .ok();
1268 }
1269 }
1270 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1271 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1272 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1273 continue;
1274 }
1275
1276 if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1277 this.remote_started.set(());
1278 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1279 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1280 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1281 continue;
1282 }
1283
1284 this.max_received.store(incoming.id, SeqCst);
1285
1286 if let Some(request_id) = incoming.responding_to {
1287 let request_id = MessageId(request_id);
1288 let sender = this.response_channels.lock().remove(&request_id);
1289 if let Some(sender) = sender {
1290 let (tx, rx) = oneshot::channel();
1291 if incoming.payload.is_some() {
1292 sender.send((incoming, tx)).ok();
1293 }
1294 rx.await.ok();
1295 }
1296 } else if let Some(envelope) =
1297 build_typed_envelope(peer_id, Instant::now(), incoming)
1298 {
1299 let type_name = envelope.payload_type_name();
1300 let message_id = envelope.message_id();
1301 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1302 &this.message_handlers,
1303 envelope,
1304 this.clone().into(),
1305 cx.clone(),
1306 ) {
1307 log::debug!("{}:ssh message received. name:{type_name}", this.name);
1308 cx.foreground_executor()
1309 .spawn(async move {
1310 match future.await {
1311 Ok(_) => {
1312 log::debug!(
1313 "{}:ssh message handled. name:{type_name}",
1314 this.name
1315 );
1316 }
1317 Err(error) => {
1318 log::error!(
1319 "{}:error handling message. type:{}, error:{:#}",
1320 this.name,
1321 type_name,
1322 format!("{error:#}").lines().fold(
1323 String::new(),
1324 |mut message, line| {
1325 if !message.is_empty() {
1326 message.push(' ');
1327 }
1328 message.push_str(line);
1329 message
1330 }
1331 )
1332 );
1333 }
1334 }
1335 })
1336 .detach()
1337 } else {
1338 log::error!("{}:unhandled ssh message name:{type_name}", this.name);
1339 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1340 message_id,
1341 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1342 ) {
1343 log::error!(
1344 "{}:error sending error response for {type_name}:{e:#}",
1345 this.name
1346 );
1347 }
1348 }
1349 }
1350 }
1351 anyhow::Ok(())
1352 })
1353 }
1354
1355 fn reconnect(
1356 self: &Arc<Self>,
1357 incoming_rx: UnboundedReceiver<Envelope>,
1358 outgoing_tx: UnboundedSender<Envelope>,
1359 cx: &AsyncApp,
1360 ) {
1361 *self.outgoing_tx.lock() = outgoing_tx;
1362 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1363 }
1364
1365 fn request<T: RequestMessage>(
1366 &self,
1367 payload: T,
1368 ) -> impl 'static + Future<Output = Result<T::Response>> {
1369 self.request_internal(payload, true)
1370 }
1371
1372 fn request_internal<T: RequestMessage>(
1373 &self,
1374 payload: T,
1375 use_buffer: bool,
1376 ) -> impl 'static + Future<Output = Result<T::Response>> {
1377 log::debug!("ssh request start. name:{}", T::NAME);
1378 let response =
1379 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1380 async move {
1381 let response = response.await?;
1382 log::debug!("ssh request finish. name:{}", T::NAME);
1383 T::Response::from_envelope(response).context("received a response of the wrong type")
1384 }
1385 }
1386
1387 async fn resync(&self, timeout: Duration) -> Result<()> {
1388 smol::future::or(
1389 async {
1390 self.request_internal(proto::FlushBufferedMessages {}, false)
1391 .await?;
1392
1393 for envelope in self.buffer.lock().iter() {
1394 self.outgoing_tx
1395 .lock()
1396 .unbounded_send(envelope.clone())
1397 .ok();
1398 }
1399 Ok(())
1400 },
1401 async {
1402 smol::Timer::after(timeout).await;
1403 anyhow::bail!("Timed out resyncing remote client")
1404 },
1405 )
1406 .await
1407 }
1408
1409 async fn ping(&self, timeout: Duration) -> Result<()> {
1410 smol::future::or(
1411 async {
1412 self.request(proto::Ping {}).await?;
1413 Ok(())
1414 },
1415 async {
1416 smol::Timer::after(timeout).await;
1417 anyhow::bail!("Timed out pinging remote client")
1418 },
1419 )
1420 .await
1421 }
1422
1423 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1424 log::debug!("ssh send name:{}", T::NAME);
1425 self.send_dynamic(payload.into_envelope(0, None, None))
1426 }
1427
1428 fn request_dynamic(
1429 &self,
1430 mut envelope: proto::Envelope,
1431 type_name: &'static str,
1432 use_buffer: bool,
1433 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1434 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1435 let (tx, rx) = oneshot::channel();
1436 let mut response_channels_lock = self.response_channels.lock();
1437 response_channels_lock.insert(MessageId(envelope.id), tx);
1438 drop(response_channels_lock);
1439
1440 let result = if use_buffer {
1441 self.send_buffered(envelope)
1442 } else {
1443 self.send_unbuffered(envelope)
1444 };
1445 async move {
1446 if let Err(error) = &result {
1447 log::error!("failed to send message: {error}");
1448 anyhow::bail!("failed to send message: {error}");
1449 }
1450
1451 let response = rx.await.context("connection lost")?.0;
1452 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1453 return Err(RpcError::from_proto(error, type_name));
1454 }
1455 Ok(response)
1456 }
1457 }
1458
1459 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1460 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1461 self.send_buffered(envelope)
1462 }
1463
1464 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1465 envelope.ack_id = Some(self.max_received.load(SeqCst));
1466 self.buffer.lock().push_back(envelope.clone());
1467 // ignore errors on send (happen while we're reconnecting)
1468 // assume that the global "disconnected" overlay is sufficient.
1469 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1470 Ok(())
1471 }
1472
1473 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1474 envelope.ack_id = Some(self.max_received.load(SeqCst));
1475 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1476 Ok(())
1477 }
1478}
1479
1480impl ProtoClient for ChannelClient {
1481 fn request(
1482 &self,
1483 envelope: proto::Envelope,
1484 request_type: &'static str,
1485 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1486 self.request_dynamic(envelope, request_type, true).boxed()
1487 }
1488
1489 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1490 self.send_dynamic(envelope)
1491 }
1492
1493 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1494 self.send_dynamic(envelope)
1495 }
1496
1497 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1498 &self.message_handlers
1499 }
1500
1501 fn is_via_collab(&self) -> bool {
1502 false
1503 }
1504
1505 fn has_wsl_interop(&self) -> bool {
1506 self.has_wsl_interop
1507 }
1508}
1509
1510#[cfg(any(test, feature = "test-support"))]
1511mod fake {
1512 use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1513 use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1514 use anyhow::Result;
1515 use askpass::EncryptedPassword;
1516 use async_trait::async_trait;
1517 use collections::HashMap;
1518 use futures::{
1519 FutureExt, SinkExt, StreamExt,
1520 channel::{
1521 mpsc::{self, Sender},
1522 oneshot,
1523 },
1524 select_biased,
1525 };
1526 use gpui::{App, AppContext as _, AsyncApp, Task, TestAppContext};
1527 use release_channel::ReleaseChannel;
1528 use rpc::proto::Envelope;
1529 use semver::Version;
1530 use std::{path::PathBuf, sync::Arc};
1531 use util::paths::{PathStyle, RemotePathBuf};
1532
1533 pub(super) struct FakeRemoteConnection {
1534 pub(super) connection_options: RemoteConnectionOptions,
1535 pub(super) server_channel: Arc<ChannelClient>,
1536 pub(super) server_cx: SendableCx,
1537 }
1538
1539 pub(super) struct SendableCx(AsyncApp);
1540 impl SendableCx {
1541 // SAFETY: When run in test mode, GPUI is always single threaded.
1542 pub(super) fn new(cx: &TestAppContext) -> Self {
1543 Self(cx.to_async())
1544 }
1545
1546 // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1547 fn get(&self, _: &AsyncApp) -> AsyncApp {
1548 self.0.clone()
1549 }
1550 }
1551
1552 // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1553 unsafe impl Send for SendableCx {}
1554 unsafe impl Sync for SendableCx {}
1555
1556 #[async_trait(?Send)]
1557 impl RemoteConnection for FakeRemoteConnection {
1558 async fn kill(&self) -> Result<()> {
1559 Ok(())
1560 }
1561
1562 fn has_been_killed(&self) -> bool {
1563 false
1564 }
1565
1566 fn build_command(
1567 &self,
1568 program: Option<String>,
1569 args: &[String],
1570 env: &HashMap<String, String>,
1571 _: Option<String>,
1572 _: Option<(u16, String, u16)>,
1573 ) -> Result<CommandTemplate> {
1574 let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1575 let mut ssh_args = Vec::new();
1576 ssh_args.push(ssh_program);
1577 ssh_args.extend(args.iter().cloned());
1578 Ok(CommandTemplate {
1579 program: "ssh".into(),
1580 args: ssh_args,
1581 env: env.clone(),
1582 })
1583 }
1584
1585 fn build_forward_ports_command(
1586 &self,
1587 forwards: Vec<(u16, String, u16)>,
1588 ) -> anyhow::Result<CommandTemplate> {
1589 Ok(CommandTemplate {
1590 program: "ssh".into(),
1591 args: std::iter::once("-N".to_owned())
1592 .chain(forwards.into_iter().map(|(local_port, host, remote_port)| {
1593 format!("{local_port}:{host}:{remote_port}")
1594 }))
1595 .collect(),
1596 env: Default::default(),
1597 })
1598 }
1599
1600 fn upload_directory(
1601 &self,
1602 _src_path: PathBuf,
1603 _dest_path: RemotePathBuf,
1604 _cx: &App,
1605 ) -> Task<Result<()>> {
1606 unreachable!()
1607 }
1608
1609 fn connection_options(&self) -> RemoteConnectionOptions {
1610 self.connection_options.clone()
1611 }
1612
1613 fn simulate_disconnect(&self, cx: &AsyncApp) {
1614 let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1615 let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1616 self.server_channel
1617 .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1618 }
1619
1620 fn start_proxy(
1621 &self,
1622 _unique_identifier: String,
1623 _reconnect: bool,
1624 mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1625 mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1626 mut connection_activity_tx: Sender<()>,
1627 _delegate: Arc<dyn RemoteClientDelegate>,
1628 cx: &mut AsyncApp,
1629 ) -> Task<Result<i32>> {
1630 let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1631 let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1632
1633 self.server_channel.reconnect(
1634 server_incoming_rx,
1635 server_outgoing_tx,
1636 &self.server_cx.get(cx),
1637 );
1638
1639 cx.background_spawn(async move {
1640 loop {
1641 select_biased! {
1642 server_to_client = server_outgoing_rx.next().fuse() => {
1643 let Some(server_to_client) = server_to_client else {
1644 return Ok(1)
1645 };
1646 connection_activity_tx.try_send(()).ok();
1647 client_incoming_tx.send(server_to_client).await.ok();
1648 }
1649 client_to_server = client_outgoing_rx.next().fuse() => {
1650 let Some(client_to_server) = client_to_server else {
1651 return Ok(1)
1652 };
1653 server_incoming_tx.send(client_to_server).await.ok();
1654 }
1655 }
1656 }
1657 })
1658 }
1659
1660 fn path_style(&self) -> PathStyle {
1661 PathStyle::local()
1662 }
1663
1664 fn shell(&self) -> String {
1665 "sh".to_owned()
1666 }
1667
1668 fn default_system_shell(&self) -> String {
1669 "sh".to_owned()
1670 }
1671
1672 fn has_wsl_interop(&self) -> bool {
1673 false
1674 }
1675 }
1676
1677 pub(super) struct Delegate;
1678
1679 impl RemoteClientDelegate for Delegate {
1680 fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1681 unreachable!()
1682 }
1683
1684 fn download_server_binary_locally(
1685 &self,
1686 _: RemotePlatform,
1687 _: ReleaseChannel,
1688 _: Option<Version>,
1689 _: &mut AsyncApp,
1690 ) -> Task<Result<PathBuf>> {
1691 unreachable!()
1692 }
1693
1694 fn get_download_url(
1695 &self,
1696 _platform: RemotePlatform,
1697 _release_channel: ReleaseChannel,
1698 _version: Option<Version>,
1699 _cx: &mut AsyncApp,
1700 ) -> Task<Result<Option<String>>> {
1701 unreachable!()
1702 }
1703
1704 fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1705 }
1706}