1use crate::{
2 SshConnectionOptions,
3 protocol::MessageId,
4 proxy::ProxyLaunchError,
5 transport::{
6 docker::{DockerConnectionOptions, DockerExecConnection},
7 ssh::SshRemoteConnection,
8 wsl::{WslConnectionOptions, WslRemoteConnection},
9 },
10};
11use anyhow::{Context as _, Result, anyhow};
12use askpass::EncryptedPassword;
13use async_trait::async_trait;
14use collections::HashMap;
15use futures::{
16 Future, FutureExt as _, StreamExt as _,
17 channel::{
18 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
19 oneshot,
20 },
21 future::{BoxFuture, Shared},
22 select, select_biased,
23};
24use gpui::{
25 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
26 EventEmitter, FutureExt, Global, Task, WeakEntity,
27};
28use parking_lot::Mutex;
29
30use release_channel::ReleaseChannel;
31use rpc::{
32 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
33 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
34};
35use semver::Version;
36use std::{
37 collections::VecDeque,
38 fmt,
39 ops::ControlFlow,
40 path::PathBuf,
41 sync::{
42 Arc, Weak,
43 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
44 },
45 time::{Duration, Instant},
46};
47use util::{
48 ResultExt,
49 paths::{PathStyle, RemotePathBuf},
50};
51
52#[derive(Copy, Clone, Debug, PartialEq, Eq)]
53pub enum RemoteOs {
54 Linux,
55 MacOs,
56 Windows,
57}
58
59impl RemoteOs {
60 pub fn as_str(&self) -> &'static str {
61 match self {
62 RemoteOs::Linux => "linux",
63 RemoteOs::MacOs => "macos",
64 RemoteOs::Windows => "windows",
65 }
66 }
67
68 pub fn is_windows(&self) -> bool {
69 matches!(self, RemoteOs::Windows)
70 }
71}
72
73impl std::fmt::Display for RemoteOs {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.write_str(self.as_str())
76 }
77}
78
79#[derive(Copy, Clone, Debug, PartialEq, Eq)]
80pub enum RemoteArch {
81 X86_64,
82 Aarch64,
83}
84
85impl RemoteArch {
86 pub fn as_str(&self) -> &'static str {
87 match self {
88 RemoteArch::X86_64 => "x86_64",
89 RemoteArch::Aarch64 => "aarch64",
90 }
91 }
92}
93
94impl std::fmt::Display for RemoteArch {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.write_str(self.as_str())
97 }
98}
99
100#[derive(Copy, Clone, Debug)]
101pub struct RemotePlatform {
102 pub os: RemoteOs,
103 pub arch: RemoteArch,
104}
105
106#[derive(Clone, Debug)]
107pub struct CommandTemplate {
108 pub program: String,
109 pub args: Vec<String>,
110 pub env: HashMap<String, String>,
111}
112
113pub trait RemoteClientDelegate: Send + Sync {
114 fn ask_password(
115 &self,
116 prompt: String,
117 tx: oneshot::Sender<EncryptedPassword>,
118 cx: &mut AsyncApp,
119 );
120 fn get_download_url(
121 &self,
122 platform: RemotePlatform,
123 release_channel: ReleaseChannel,
124 version: Option<Version>,
125 cx: &mut AsyncApp,
126 ) -> Task<Result<Option<String>>>;
127 fn download_server_binary_locally(
128 &self,
129 platform: RemotePlatform,
130 release_channel: ReleaseChannel,
131 version: Option<Version>,
132 cx: &mut AsyncApp,
133 ) -> Task<Result<PathBuf>>;
134 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
135}
136
137const MAX_MISSED_HEARTBEATS: usize = 5;
138const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
139const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
140const INITIAL_CONNECTION_TIMEOUT: Duration =
141 Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
142
143const MAX_RECONNECT_ATTEMPTS: usize = 3;
144
145enum State {
146 Connecting,
147 Connected {
148 remote_connection: Arc<dyn RemoteConnection>,
149 delegate: Arc<dyn RemoteClientDelegate>,
150
151 multiplex_task: Task<Result<()>>,
152 heartbeat_task: Task<Result<()>>,
153 },
154 HeartbeatMissed {
155 missed_heartbeats: usize,
156
157 ssh_connection: Arc<dyn RemoteConnection>,
158 delegate: Arc<dyn RemoteClientDelegate>,
159
160 multiplex_task: Task<Result<()>>,
161 heartbeat_task: Task<Result<()>>,
162 },
163 Reconnecting,
164 ReconnectFailed {
165 ssh_connection: Arc<dyn RemoteConnection>,
166 delegate: Arc<dyn RemoteClientDelegate>,
167
168 error: anyhow::Error,
169 attempts: usize,
170 },
171 ReconnectExhausted,
172 ServerNotRunning,
173}
174
175impl fmt::Display for State {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 match self {
178 Self::Connecting => write!(f, "connecting"),
179 Self::Connected { .. } => write!(f, "connected"),
180 Self::Reconnecting => write!(f, "reconnecting"),
181 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
182 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
183 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
184 Self::ServerNotRunning { .. } => write!(f, "server not running"),
185 }
186 }
187}
188
189impl State {
190 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
191 match self {
192 Self::Connected {
193 remote_connection: ssh_connection,
194 ..
195 } => Some(ssh_connection.clone()),
196 Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.clone()),
197 Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.clone()),
198 _ => None,
199 }
200 }
201
202 fn can_reconnect(&self) -> bool {
203 match self {
204 Self::Connected { .. }
205 | Self::HeartbeatMissed { .. }
206 | Self::ReconnectFailed { .. } => true,
207 State::Connecting
208 | State::Reconnecting
209 | State::ReconnectExhausted
210 | State::ServerNotRunning => false,
211 }
212 }
213
214 fn is_reconnect_failed(&self) -> bool {
215 matches!(self, Self::ReconnectFailed { .. })
216 }
217
218 fn is_reconnect_exhausted(&self) -> bool {
219 matches!(self, Self::ReconnectExhausted { .. })
220 }
221
222 fn is_server_not_running(&self) -> bool {
223 matches!(self, Self::ServerNotRunning)
224 }
225
226 fn is_reconnecting(&self) -> bool {
227 matches!(self, Self::Reconnecting { .. })
228 }
229
230 fn heartbeat_recovered(self) -> Self {
231 match self {
232 Self::HeartbeatMissed {
233 ssh_connection,
234 delegate,
235 multiplex_task,
236 heartbeat_task,
237 ..
238 } => Self::Connected {
239 remote_connection: ssh_connection,
240 delegate,
241 multiplex_task,
242 heartbeat_task,
243 },
244 _ => self,
245 }
246 }
247
248 fn heartbeat_missed(self) -> Self {
249 match self {
250 Self::Connected {
251 remote_connection: ssh_connection,
252 delegate,
253 multiplex_task,
254 heartbeat_task,
255 } => Self::HeartbeatMissed {
256 missed_heartbeats: 1,
257 ssh_connection,
258 delegate,
259 multiplex_task,
260 heartbeat_task,
261 },
262 Self::HeartbeatMissed {
263 missed_heartbeats,
264 ssh_connection,
265 delegate,
266 multiplex_task,
267 heartbeat_task,
268 } => Self::HeartbeatMissed {
269 missed_heartbeats: missed_heartbeats + 1,
270 ssh_connection,
271 delegate,
272 multiplex_task,
273 heartbeat_task,
274 },
275 _ => self,
276 }
277 }
278}
279
280/// The state of the ssh connection.
281#[derive(Clone, Copy, Debug, PartialEq, Eq)]
282pub enum ConnectionState {
283 Connecting,
284 Connected,
285 HeartbeatMissed,
286 Reconnecting,
287 Disconnected,
288}
289
290impl From<&State> for ConnectionState {
291 fn from(value: &State) -> Self {
292 match value {
293 State::Connecting => Self::Connecting,
294 State::Connected { .. } => Self::Connected,
295 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
296 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
297 State::ReconnectExhausted => Self::Disconnected,
298 State::ServerNotRunning => Self::Disconnected,
299 }
300 }
301}
302
303pub struct RemoteClient {
304 client: Arc<ChannelClient>,
305 unique_identifier: String,
306 connection_options: RemoteConnectionOptions,
307 path_style: PathStyle,
308 state: Option<State>,
309}
310
311#[derive(Debug)]
312pub enum RemoteClientEvent {
313 Disconnected,
314}
315
316impl EventEmitter<RemoteClientEvent> for RemoteClient {}
317
318/// Identifies the socket on the remote server so that reconnects
319/// can re-join the same project.
320pub enum ConnectionIdentifier {
321 Setup(u64),
322 Workspace(i64),
323}
324
325static NEXT_ID: AtomicU64 = AtomicU64::new(1);
326
327impl ConnectionIdentifier {
328 pub fn setup() -> Self {
329 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
330 }
331
332 // This string gets used in a socket name, and so must be relatively short.
333 // The total length of:
334 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
335 // Must be less than about 100 characters
336 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
337 // So our strings should be at most 20 characters or so.
338 fn to_string(&self, cx: &App) -> String {
339 let identifier_prefix = match ReleaseChannel::global(cx) {
340 ReleaseChannel::Stable => "".to_string(),
341 release_channel => format!("{}-", release_channel.dev_name()),
342 };
343 match self {
344 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
345 Self::Workspace(workspace_id) => {
346 format!("{identifier_prefix}workspace-{workspace_id}",)
347 }
348 }
349 }
350}
351
352pub async fn connect(
353 connection_options: RemoteConnectionOptions,
354 delegate: Arc<dyn RemoteClientDelegate>,
355 cx: &mut AsyncApp,
356) -> Result<Arc<dyn RemoteConnection>> {
357 cx.update(|cx| {
358 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
359 pool.connect(connection_options.clone(), delegate.clone(), cx)
360 })
361 })?
362 .await
363 .map_err(|e| e.cloned())
364}
365
366impl RemoteClient {
367 pub fn new(
368 unique_identifier: ConnectionIdentifier,
369 remote_connection: Arc<dyn RemoteConnection>,
370 cancellation: oneshot::Receiver<()>,
371 delegate: Arc<dyn RemoteClientDelegate>,
372 cx: &mut App,
373 ) -> Task<Result<Option<Entity<Self>>>> {
374 let unique_identifier = unique_identifier.to_string(cx);
375 cx.spawn(async move |cx| {
376 let success = Box::pin(async move {
377 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
378 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
379 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
380
381 let client = cx.update(|cx| {
382 ChannelClient::new(
383 incoming_rx,
384 outgoing_tx,
385 cx,
386 "client",
387 remote_connection.has_wsl_interop(),
388 )
389 })?;
390
391 let path_style = remote_connection.path_style();
392 let this = cx.new(|_| Self {
393 client: client.clone(),
394 unique_identifier: unique_identifier.clone(),
395 connection_options: remote_connection.connection_options(),
396 path_style,
397 state: Some(State::Connecting),
398 })?;
399
400 let io_task = remote_connection.start_proxy(
401 unique_identifier,
402 false,
403 incoming_tx,
404 outgoing_rx,
405 connection_activity_tx,
406 delegate.clone(),
407 cx,
408 );
409
410 let ready = client
411 .wait_for_remote_started()
412 .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
413 .await;
414 match ready {
415 Ok(Some(_)) => {}
416 Ok(None) => {
417 let mut error = "remote client exited before becoming ready".to_owned();
418 if let Some(status) = io_task.now_or_never() {
419 match status {
420 Ok(exit_code) => {
421 error.push_str(&format!(", exit_code={exit_code:?}"))
422 }
423 Err(e) => error.push_str(&format!(", error={e:?}")),
424 }
425 }
426 let error = anyhow::anyhow!("{error}");
427 log::error!("failed to establish connection: {}", error);
428 return Err(error);
429 }
430 Err(_) => {
431 let mut error =
432 "remote client did not become ready within the timeout".to_owned();
433 if let Some(status) = io_task.now_or_never() {
434 match status {
435 Ok(exit_code) => {
436 error.push_str(&format!(", exit_code={exit_code:?}"))
437 }
438 Err(e) => error.push_str(&format!(", error={e:?}")),
439 }
440 }
441 let error = anyhow::anyhow!("{error}");
442 log::error!("failed to establish connection: {}", error);
443 return Err(error);
444 }
445 }
446 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
447 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
448 log::error!("failed to establish connection: {}", error);
449 return Err(error);
450 }
451
452 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
453
454 this.update(cx, |this, _| {
455 this.state = Some(State::Connected {
456 remote_connection,
457 delegate,
458 multiplex_task,
459 heartbeat_task,
460 });
461 })?;
462
463 Ok(Some(this))
464 });
465
466 select! {
467 _ = cancellation.fuse() => {
468 Ok(None)
469 }
470 result = success.fuse() => result
471 }
472 })
473 }
474
475 pub fn proto_client_from_channels(
476 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
477 outgoing_tx: mpsc::UnboundedSender<Envelope>,
478 cx: &App,
479 name: &'static str,
480 has_wsl_interop: bool,
481 ) -> AnyProtoClient {
482 ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
483 }
484
485 pub fn shutdown_processes<T: RequestMessage>(
486 &mut self,
487 shutdown_request: Option<T>,
488 executor: BackgroundExecutor,
489 ) -> Option<impl Future<Output = ()> + use<T>> {
490 let state = self.state.take()?;
491 log::info!("shutting down ssh processes");
492
493 let State::Connected {
494 multiplex_task,
495 heartbeat_task,
496 remote_connection: ssh_connection,
497 delegate,
498 } = state
499 else {
500 return None;
501 };
502
503 let client = self.client.clone();
504
505 Some(async move {
506 if let Some(shutdown_request) = shutdown_request {
507 client.send(shutdown_request).log_err();
508 // We wait 50ms instead of waiting for a response, because
509 // waiting for a response would require us to wait on the main thread
510 // which we want to avoid in an `on_app_quit` callback.
511 executor.timer(Duration::from_millis(50)).await;
512 }
513
514 // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
515 // child of master_process.
516 drop(multiplex_task);
517 // Now drop the rest of state, which kills master process.
518 drop(heartbeat_task);
519 drop(ssh_connection);
520 drop(delegate);
521 })
522 }
523
524 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
525 let can_reconnect = self
526 .state
527 .as_ref()
528 .map(|state| state.can_reconnect())
529 .unwrap_or(false);
530 if !can_reconnect {
531 log::info!("aborting reconnect, because not in state that allows reconnecting");
532 let error = if let Some(state) = self.state.as_ref() {
533 format!("invalid state, cannot reconnect while in state {state}")
534 } else {
535 "no state set".to_string()
536 };
537 anyhow::bail!(error);
538 }
539
540 let state = self.state.take().unwrap();
541 let (attempts, remote_connection, delegate) = match state {
542 State::Connected {
543 remote_connection: ssh_connection,
544 delegate,
545 multiplex_task,
546 heartbeat_task,
547 }
548 | State::HeartbeatMissed {
549 ssh_connection,
550 delegate,
551 multiplex_task,
552 heartbeat_task,
553 ..
554 } => {
555 drop(multiplex_task);
556 drop(heartbeat_task);
557 (0, ssh_connection, delegate)
558 }
559 State::ReconnectFailed {
560 attempts,
561 ssh_connection,
562 delegate,
563 ..
564 } => (attempts, ssh_connection, delegate),
565 State::Connecting
566 | State::Reconnecting
567 | State::ReconnectExhausted
568 | State::ServerNotRunning => unreachable!(),
569 };
570
571 let attempts = attempts + 1;
572 if attempts > MAX_RECONNECT_ATTEMPTS {
573 log::error!(
574 "Failed to reconnect to after {} attempts, giving up",
575 MAX_RECONNECT_ATTEMPTS
576 );
577 self.set_state(State::ReconnectExhausted, cx);
578 return Ok(());
579 }
580
581 self.set_state(State::Reconnecting, cx);
582
583 log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
584
585 let unique_identifier = self.unique_identifier.clone();
586 let client = self.client.clone();
587 let reconnect_task = cx.spawn(async move |this, cx| {
588 macro_rules! failed {
589 ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr) => {
590 delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
591 return State::ReconnectFailed {
592 error: anyhow!($error),
593 attempts: $attempts,
594 ssh_connection: $ssh_connection,
595 delegate: $delegate,
596 };
597 };
598 }
599
600 if let Err(error) = remote_connection
601 .kill()
602 .await
603 .context("Failed to kill ssh process")
604 {
605 failed!(error, attempts, remote_connection, delegate);
606 };
607
608 let connection_options = remote_connection.connection_options();
609
610 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
611 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
612 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
613
614 let (ssh_connection, io_task) = match async {
615 let ssh_connection = cx
616 .update_global(|pool: &mut ConnectionPool, cx| {
617 pool.connect(connection_options, delegate.clone(), cx)
618 })?
619 .await
620 .map_err(|error| error.cloned())?;
621
622 let io_task = ssh_connection.start_proxy(
623 unique_identifier,
624 true,
625 incoming_tx,
626 outgoing_rx,
627 connection_activity_tx,
628 delegate.clone(),
629 cx,
630 );
631 anyhow::Ok((ssh_connection, io_task))
632 }
633 .await
634 {
635 Ok((ssh_connection, io_task)) => (ssh_connection, io_task),
636 Err(error) => {
637 failed!(error, attempts, remote_connection, delegate);
638 }
639 };
640
641 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
642 client.reconnect(incoming_rx, outgoing_tx, cx);
643
644 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
645 failed!(error, attempts, ssh_connection, delegate);
646 };
647
648 State::Connected {
649 remote_connection: ssh_connection,
650 delegate,
651 multiplex_task,
652 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
653 }
654 });
655
656 cx.spawn(async move |this, cx| {
657 let new_state = reconnect_task.await;
658 this.update(cx, |this, cx| {
659 this.try_set_state(cx, |old_state| {
660 if old_state.is_reconnecting() {
661 match &new_state {
662 State::Connecting
663 | State::Reconnecting
664 | State::HeartbeatMissed { .. }
665 | State::ServerNotRunning => {}
666 State::Connected { .. } => {
667 log::info!("Successfully reconnected");
668 }
669 State::ReconnectFailed {
670 error, attempts, ..
671 } => {
672 log::error!(
673 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
674 attempts,
675 error
676 );
677 }
678 State::ReconnectExhausted => {
679 log::error!("Reconnect attempt failed and all attempts exhausted");
680 }
681 }
682 Some(new_state)
683 } else {
684 None
685 }
686 });
687
688 if this.state_is(State::is_reconnect_failed) {
689 this.reconnect(cx)
690 } else if this.state_is(State::is_reconnect_exhausted) {
691 Ok(())
692 } else {
693 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
694 Ok(())
695 }
696 })
697 })
698 .detach_and_log_err(cx);
699
700 Ok(())
701 }
702
703 fn heartbeat(
704 this: WeakEntity<Self>,
705 mut connection_activity_rx: mpsc::Receiver<()>,
706 cx: &mut AsyncApp,
707 ) -> Task<Result<()>> {
708 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
709 return Task::ready(Err(anyhow!("SshRemoteClient lost")));
710 };
711
712 cx.spawn(async move |cx| {
713 let mut missed_heartbeats = 0;
714
715 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
716 futures::pin_mut!(keepalive_timer);
717
718 loop {
719 select_biased! {
720 result = connection_activity_rx.next().fuse() => {
721 if result.is_none() {
722 log::warn!("ssh heartbeat: connection activity channel has been dropped. stopping.");
723 return Ok(());
724 }
725
726 if missed_heartbeats != 0 {
727 missed_heartbeats = 0;
728 let _ =this.update(cx, |this, cx| {
729 this.handle_heartbeat_result(missed_heartbeats, cx)
730 })?;
731 }
732 }
733 _ = keepalive_timer => {
734 log::debug!("Sending heartbeat to server...");
735
736 let result = select_biased! {
737 _ = connection_activity_rx.next().fuse() => {
738 Ok(())
739 }
740 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
741 ping_result
742 }
743 };
744
745 if result.is_err() {
746 missed_heartbeats += 1;
747 log::warn!(
748 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
749 HEARTBEAT_TIMEOUT,
750 missed_heartbeats,
751 MAX_MISSED_HEARTBEATS
752 );
753 } else if missed_heartbeats != 0 {
754 missed_heartbeats = 0;
755 } else {
756 continue;
757 }
758
759 let result = this.update(cx, |this, cx| {
760 this.handle_heartbeat_result(missed_heartbeats, cx)
761 })?;
762 if result.is_break() {
763 return Ok(());
764 }
765 }
766 }
767
768 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
769 }
770 })
771 }
772
773 fn handle_heartbeat_result(
774 &mut self,
775 missed_heartbeats: usize,
776 cx: &mut Context<Self>,
777 ) -> ControlFlow<()> {
778 let state = self.state.take().unwrap();
779 let next_state = if missed_heartbeats > 0 {
780 state.heartbeat_missed()
781 } else {
782 state.heartbeat_recovered()
783 };
784
785 self.set_state(next_state, cx);
786
787 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
788 log::error!(
789 "Missed last {} heartbeats. Reconnecting...",
790 missed_heartbeats
791 );
792
793 self.reconnect(cx)
794 .context("failed to start reconnect process after missing heartbeats")
795 .log_err();
796 ControlFlow::Break(())
797 } else {
798 ControlFlow::Continue(())
799 }
800 }
801
802 fn monitor(
803 this: WeakEntity<Self>,
804 io_task: Task<Result<i32>>,
805 cx: &AsyncApp,
806 ) -> Task<Result<()>> {
807 cx.spawn(async move |cx| {
808 let result = io_task.await;
809
810 match result {
811 Ok(exit_code) => {
812 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
813 match error {
814 ProxyLaunchError::ServerNotRunning => {
815 log::error!("failed to reconnect because server is not running");
816 this.update(cx, |this, cx| {
817 this.set_state(State::ServerNotRunning, cx);
818 })?;
819 }
820 }
821 } else if exit_code > 0 {
822 log::error!("proxy process terminated unexpectedly");
823 this.update(cx, |this, cx| {
824 this.reconnect(cx).ok();
825 })?;
826 }
827 }
828 Err(error) => {
829 log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
830 this.update(cx, |this, cx| {
831 this.reconnect(cx).ok();
832 })?;
833 }
834 }
835
836 Ok(())
837 })
838 }
839
840 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
841 self.state.as_ref().is_some_and(check)
842 }
843
844 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
845 let new_state = self.state.as_ref().and_then(map);
846 if let Some(new_state) = new_state {
847 self.state.replace(new_state);
848 cx.notify();
849 }
850 }
851
852 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
853 log::info!("setting state to '{}'", &state);
854
855 let is_reconnect_exhausted = state.is_reconnect_exhausted();
856 let is_server_not_running = state.is_server_not_running();
857 self.state.replace(state);
858
859 if is_reconnect_exhausted || is_server_not_running {
860 cx.emit(RemoteClientEvent::Disconnected);
861 }
862 cx.notify();
863 }
864
865 pub fn shell(&self) -> Option<String> {
866 Some(self.remote_connection()?.shell())
867 }
868
869 pub fn default_system_shell(&self) -> Option<String> {
870 Some(self.remote_connection()?.default_system_shell())
871 }
872
873 pub fn shares_network_interface(&self) -> bool {
874 self.remote_connection()
875 .map_or(false, |connection| connection.shares_network_interface())
876 }
877
878 pub fn build_command(
879 &self,
880 program: Option<String>,
881 args: &[String],
882 env: &HashMap<String, String>,
883 working_dir: Option<String>,
884 port_forward: Option<(u16, String, u16)>,
885 ) -> Result<CommandTemplate> {
886 let Some(connection) = self.remote_connection() else {
887 return Err(anyhow!("no ssh connection"));
888 };
889 connection.build_command(program, args, env, working_dir, port_forward)
890 }
891
892 pub fn build_forward_ports_command(
893 &self,
894 forwards: Vec<(u16, String, u16)>,
895 ) -> Result<CommandTemplate> {
896 let Some(connection) = self.remote_connection() else {
897 return Err(anyhow!("no ssh connection"));
898 };
899 connection.build_forward_ports_command(forwards)
900 }
901
902 pub fn upload_directory(
903 &self,
904 src_path: PathBuf,
905 dest_path: RemotePathBuf,
906 cx: &App,
907 ) -> Task<Result<()>> {
908 let Some(connection) = self.remote_connection() else {
909 return Task::ready(Err(anyhow!("no ssh connection")));
910 };
911 connection.upload_directory(src_path, dest_path, cx)
912 }
913
914 pub fn proto_client(&self) -> AnyProtoClient {
915 self.client.clone().into()
916 }
917
918 pub fn connection_options(&self) -> RemoteConnectionOptions {
919 self.connection_options.clone()
920 }
921
922 pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
923 if let State::Connected {
924 remote_connection, ..
925 } = self.state.as_ref()?
926 {
927 Some(remote_connection.clone())
928 } else {
929 None
930 }
931 }
932
933 pub fn connection_state(&self) -> ConnectionState {
934 self.state
935 .as_ref()
936 .map(ConnectionState::from)
937 .unwrap_or(ConnectionState::Disconnected)
938 }
939
940 pub fn is_disconnected(&self) -> bool {
941 self.connection_state() == ConnectionState::Disconnected
942 }
943
944 pub fn path_style(&self) -> PathStyle {
945 self.path_style
946 }
947
948 #[cfg(any(test, feature = "test-support"))]
949 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
950 let opts = self.connection_options();
951 client_cx.spawn(async move |cx| {
952 let connection = cx
953 .update_global(|c: &mut ConnectionPool, _| {
954 if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
955 c.clone()
956 } else {
957 panic!("missing test connection")
958 }
959 })
960 .unwrap()
961 .await
962 .unwrap();
963
964 connection.simulate_disconnect(cx);
965 })
966 }
967
968 #[cfg(any(test, feature = "test-support"))]
969 pub fn fake_server(
970 client_cx: &mut gpui::TestAppContext,
971 server_cx: &mut gpui::TestAppContext,
972 ) -> (RemoteConnectionOptions, AnyProtoClient) {
973 use crate::transport::ssh::SshConnectionHost;
974
975 let port = client_cx
976 .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
977 let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
978 host: SshConnectionHost::from("<fake>".to_string()),
979 port: Some(port),
980 ..Default::default()
981 });
982 let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
983 let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
984 let server_client = server_cx
985 .update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server", false));
986 let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
987 connection_options: opts.clone(),
988 server_cx: fake::SendableCx::new(server_cx),
989 server_channel: server_client.clone(),
990 });
991
992 client_cx.update(|cx| {
993 cx.update_default_global(|c: &mut ConnectionPool, cx| {
994 c.connections.insert(
995 opts.clone(),
996 ConnectionPoolEntry::Connecting(
997 cx.background_spawn({
998 let connection = connection.clone();
999 async move { Ok(connection.clone()) }
1000 })
1001 .shared(),
1002 ),
1003 );
1004 })
1005 });
1006
1007 (opts, server_client.into())
1008 }
1009
1010 #[cfg(any(test, feature = "test-support"))]
1011 pub async fn fake_client(
1012 opts: RemoteConnectionOptions,
1013 client_cx: &mut gpui::TestAppContext,
1014 ) -> Entity<Self> {
1015 let (_tx, rx) = oneshot::channel();
1016 let mut cx = client_cx.to_async();
1017 let connection = connect(opts, Arc::new(fake::Delegate), &mut cx)
1018 .await
1019 .unwrap();
1020 client_cx
1021 .update(|cx| {
1022 Self::new(
1023 ConnectionIdentifier::setup(),
1024 connection,
1025 rx,
1026 Arc::new(fake::Delegate),
1027 cx,
1028 )
1029 })
1030 .await
1031 .unwrap()
1032 .unwrap()
1033 }
1034
1035 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1036 self.state
1037 .as_ref()
1038 .and_then(|state| state.remote_connection())
1039 }
1040}
1041
1042enum ConnectionPoolEntry {
1043 Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1044 Connected(Weak<dyn RemoteConnection>),
1045}
1046
1047#[derive(Default)]
1048struct ConnectionPool {
1049 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1050}
1051
1052impl Global for ConnectionPool {}
1053
1054impl ConnectionPool {
1055 pub fn connect(
1056 &mut self,
1057 opts: RemoteConnectionOptions,
1058 delegate: Arc<dyn RemoteClientDelegate>,
1059 cx: &mut App,
1060 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1061 let connection = self.connections.get(&opts);
1062 match connection {
1063 Some(ConnectionPoolEntry::Connecting(task)) => {
1064 delegate.set_status(
1065 Some("Waiting for existing connection attempt"),
1066 &mut cx.to_async(),
1067 );
1068 return task.clone();
1069 }
1070 Some(ConnectionPoolEntry::Connected(ssh)) => {
1071 if let Some(ssh) = ssh.upgrade()
1072 && !ssh.has_been_killed()
1073 {
1074 return Task::ready(Ok(ssh)).shared();
1075 }
1076 self.connections.remove(&opts);
1077 }
1078 None => {}
1079 }
1080
1081 let task = cx
1082 .spawn({
1083 let opts = opts.clone();
1084 let delegate = delegate.clone();
1085 async move |cx| {
1086 let connection = match opts.clone() {
1087 RemoteConnectionOptions::Ssh(opts) => {
1088 SshRemoteConnection::new(opts, delegate, cx)
1089 .await
1090 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1091 }
1092 RemoteConnectionOptions::Wsl(opts) => {
1093 WslRemoteConnection::new(opts, delegate, cx)
1094 .await
1095 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1096 }
1097 RemoteConnectionOptions::Docker(opts) => {
1098 DockerExecConnection::new(opts, delegate, cx)
1099 .await
1100 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1101 }
1102 };
1103
1104 cx.update_global(|pool: &mut Self, _| {
1105 debug_assert!(matches!(
1106 pool.connections.get(&opts),
1107 Some(ConnectionPoolEntry::Connecting(_))
1108 ));
1109 match connection {
1110 Ok(connection) => {
1111 pool.connections.insert(
1112 opts.clone(),
1113 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1114 );
1115 Ok(connection)
1116 }
1117 Err(error) => {
1118 pool.connections.remove(&opts);
1119 Err(Arc::new(error))
1120 }
1121 }
1122 })?
1123 }
1124 })
1125 .shared();
1126
1127 self.connections
1128 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1129 task
1130 }
1131}
1132
1133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1134pub enum RemoteConnectionOptions {
1135 Ssh(SshConnectionOptions),
1136 Wsl(WslConnectionOptions),
1137 Docker(DockerConnectionOptions),
1138}
1139
1140impl RemoteConnectionOptions {
1141 pub fn display_name(&self) -> String {
1142 match self {
1143 RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1144 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1145 RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1146 }
1147 }
1148}
1149
1150impl From<SshConnectionOptions> for RemoteConnectionOptions {
1151 fn from(opts: SshConnectionOptions) -> Self {
1152 RemoteConnectionOptions::Ssh(opts)
1153 }
1154}
1155
1156impl From<WslConnectionOptions> for RemoteConnectionOptions {
1157 fn from(opts: WslConnectionOptions) -> Self {
1158 RemoteConnectionOptions::Wsl(opts)
1159 }
1160}
1161
1162#[cfg(target_os = "windows")]
1163/// Open a wsl path (\\wsl.localhost\<distro>\path)
1164#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1165#[action(namespace = workspace, no_json, no_register)]
1166pub struct OpenWslPath {
1167 pub distro: WslConnectionOptions,
1168 pub paths: Vec<PathBuf>,
1169}
1170
1171#[async_trait(?Send)]
1172pub trait RemoteConnection: Send + Sync {
1173 fn start_proxy(
1174 &self,
1175 unique_identifier: String,
1176 reconnect: bool,
1177 incoming_tx: UnboundedSender<Envelope>,
1178 outgoing_rx: UnboundedReceiver<Envelope>,
1179 connection_activity_tx: Sender<()>,
1180 delegate: Arc<dyn RemoteClientDelegate>,
1181 cx: &mut AsyncApp,
1182 ) -> Task<Result<i32>>;
1183 fn upload_directory(
1184 &self,
1185 src_path: PathBuf,
1186 dest_path: RemotePathBuf,
1187 cx: &App,
1188 ) -> Task<Result<()>>;
1189 async fn kill(&self) -> Result<()>;
1190 fn has_been_killed(&self) -> bool;
1191 fn shares_network_interface(&self) -> bool {
1192 false
1193 }
1194 fn build_command(
1195 &self,
1196 program: Option<String>,
1197 args: &[String],
1198 env: &HashMap<String, String>,
1199 working_dir: Option<String>,
1200 port_forward: Option<(u16, String, u16)>,
1201 ) -> Result<CommandTemplate>;
1202 fn build_forward_ports_command(
1203 &self,
1204 forwards: Vec<(u16, String, u16)>,
1205 ) -> Result<CommandTemplate>;
1206 fn connection_options(&self) -> RemoteConnectionOptions;
1207 fn path_style(&self) -> PathStyle;
1208 fn shell(&self) -> String;
1209 fn default_system_shell(&self) -> String;
1210 fn has_wsl_interop(&self) -> bool;
1211
1212 #[cfg(any(test, feature = "test-support"))]
1213 fn simulate_disconnect(&self, _: &AsyncApp) {}
1214}
1215
1216type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1217
1218struct Signal<T> {
1219 tx: Mutex<Option<oneshot::Sender<T>>>,
1220 rx: Shared<Task<Option<T>>>,
1221}
1222
1223impl<T: Send + Clone + 'static> Signal<T> {
1224 pub fn new(cx: &App) -> Self {
1225 let (tx, rx) = oneshot::channel();
1226
1227 let task = cx
1228 .background_executor()
1229 .spawn(async move { rx.await.ok() })
1230 .shared();
1231
1232 Self {
1233 tx: Mutex::new(Some(tx)),
1234 rx: task,
1235 }
1236 }
1237
1238 fn set(&self, value: T) {
1239 if let Some(tx) = self.tx.lock().take() {
1240 let _ = tx.send(value);
1241 }
1242 }
1243
1244 fn wait(&self) -> Shared<Task<Option<T>>> {
1245 self.rx.clone()
1246 }
1247}
1248
1249struct ChannelClient {
1250 next_message_id: AtomicU32,
1251 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1252 buffer: Mutex<VecDeque<Envelope>>,
1253 response_channels: ResponseChannels,
1254 message_handlers: Mutex<ProtoMessageHandlerSet>,
1255 max_received: AtomicU32,
1256 name: &'static str,
1257 task: Mutex<Task<Result<()>>>,
1258 remote_started: Signal<()>,
1259 has_wsl_interop: bool,
1260}
1261
1262impl ChannelClient {
1263 fn new(
1264 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1265 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1266 cx: &App,
1267 name: &'static str,
1268 has_wsl_interop: bool,
1269 ) -> Arc<Self> {
1270 Arc::new_cyclic(|this| Self {
1271 outgoing_tx: Mutex::new(outgoing_tx),
1272 next_message_id: AtomicU32::new(0),
1273 max_received: AtomicU32::new(0),
1274 response_channels: ResponseChannels::default(),
1275 message_handlers: Default::default(),
1276 buffer: Mutex::new(VecDeque::new()),
1277 name,
1278 task: Mutex::new(Self::start_handling_messages(
1279 this.clone(),
1280 incoming_rx,
1281 &cx.to_async(),
1282 )),
1283 remote_started: Signal::new(cx),
1284 has_wsl_interop,
1285 })
1286 }
1287
1288 fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1289 self.remote_started.wait()
1290 }
1291
1292 fn start_handling_messages(
1293 this: Weak<Self>,
1294 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1295 cx: &AsyncApp,
1296 ) -> Task<Result<()>> {
1297 cx.spawn(async move |cx| {
1298 if let Some(this) = this.upgrade() {
1299 let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1300 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1301 };
1302
1303 let peer_id = PeerId { owner_id: 0, id: 0 };
1304 while let Some(incoming) = incoming_rx.next().await {
1305 let Some(this) = this.upgrade() else {
1306 return anyhow::Ok(());
1307 };
1308 if let Some(ack_id) = incoming.ack_id {
1309 let mut buffer = this.buffer.lock();
1310 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1311 buffer.pop_front();
1312 }
1313 }
1314 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1315 {
1316 log::debug!(
1317 "{}:ssh message received. name:FlushBufferedMessages",
1318 this.name
1319 );
1320 {
1321 let buffer = this.buffer.lock();
1322 for envelope in buffer.iter() {
1323 this.outgoing_tx
1324 .lock()
1325 .unbounded_send(envelope.clone())
1326 .ok();
1327 }
1328 }
1329 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1330 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1331 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1332 continue;
1333 }
1334
1335 if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1336 this.remote_started.set(());
1337 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1338 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1339 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1340 continue;
1341 }
1342
1343 this.max_received.store(incoming.id, SeqCst);
1344
1345 if let Some(request_id) = incoming.responding_to {
1346 let request_id = MessageId(request_id);
1347 let sender = this.response_channels.lock().remove(&request_id);
1348 if let Some(sender) = sender {
1349 let (tx, rx) = oneshot::channel();
1350 if incoming.payload.is_some() {
1351 sender.send((incoming, tx)).ok();
1352 }
1353 rx.await.ok();
1354 }
1355 } else if let Some(envelope) =
1356 build_typed_envelope(peer_id, Instant::now(), incoming)
1357 {
1358 let type_name = envelope.payload_type_name();
1359 let message_id = envelope.message_id();
1360 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1361 &this.message_handlers,
1362 envelope,
1363 this.clone().into(),
1364 cx.clone(),
1365 ) {
1366 log::debug!("{}:ssh message received. name:{type_name}", this.name);
1367 cx.foreground_executor()
1368 .spawn(async move {
1369 match future.await {
1370 Ok(_) => {
1371 log::debug!(
1372 "{}:ssh message handled. name:{type_name}",
1373 this.name
1374 );
1375 }
1376 Err(error) => {
1377 log::error!(
1378 "{}:error handling message. type:{}, error:{:#}",
1379 this.name,
1380 type_name,
1381 format!("{error:#}").lines().fold(
1382 String::new(),
1383 |mut message, line| {
1384 if !message.is_empty() {
1385 message.push(' ');
1386 }
1387 message.push_str(line);
1388 message
1389 }
1390 )
1391 );
1392 }
1393 }
1394 })
1395 .detach()
1396 } else {
1397 log::error!("{}:unhandled ssh message name:{type_name}", this.name);
1398 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1399 message_id,
1400 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1401 ) {
1402 log::error!(
1403 "{}:error sending error response for {type_name}:{e:#}",
1404 this.name
1405 );
1406 }
1407 }
1408 }
1409 }
1410 anyhow::Ok(())
1411 })
1412 }
1413
1414 fn reconnect(
1415 self: &Arc<Self>,
1416 incoming_rx: UnboundedReceiver<Envelope>,
1417 outgoing_tx: UnboundedSender<Envelope>,
1418 cx: &AsyncApp,
1419 ) {
1420 *self.outgoing_tx.lock() = outgoing_tx;
1421 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1422 }
1423
1424 fn request<T: RequestMessage>(
1425 &self,
1426 payload: T,
1427 ) -> impl 'static + Future<Output = Result<T::Response>> {
1428 self.request_internal(payload, true)
1429 }
1430
1431 fn request_internal<T: RequestMessage>(
1432 &self,
1433 payload: T,
1434 use_buffer: bool,
1435 ) -> impl 'static + Future<Output = Result<T::Response>> {
1436 log::debug!("ssh request start. name:{}", T::NAME);
1437 let response =
1438 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1439 async move {
1440 let response = response.await?;
1441 log::debug!("ssh request finish. name:{}", T::NAME);
1442 T::Response::from_envelope(response).context("received a response of the wrong type")
1443 }
1444 }
1445
1446 async fn resync(&self, timeout: Duration) -> Result<()> {
1447 smol::future::or(
1448 async {
1449 self.request_internal(proto::FlushBufferedMessages {}, false)
1450 .await?;
1451
1452 for envelope in self.buffer.lock().iter() {
1453 self.outgoing_tx
1454 .lock()
1455 .unbounded_send(envelope.clone())
1456 .ok();
1457 }
1458 Ok(())
1459 },
1460 async {
1461 smol::Timer::after(timeout).await;
1462 anyhow::bail!("Timed out resyncing remote client")
1463 },
1464 )
1465 .await
1466 }
1467
1468 async fn ping(&self, timeout: Duration) -> Result<()> {
1469 smol::future::or(
1470 async {
1471 self.request(proto::Ping {}).await?;
1472 Ok(())
1473 },
1474 async {
1475 smol::Timer::after(timeout).await;
1476 anyhow::bail!("Timed out pinging remote client")
1477 },
1478 )
1479 .await
1480 }
1481
1482 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1483 log::debug!("ssh send name:{}", T::NAME);
1484 self.send_dynamic(payload.into_envelope(0, None, None))
1485 }
1486
1487 fn request_dynamic(
1488 &self,
1489 mut envelope: proto::Envelope,
1490 type_name: &'static str,
1491 use_buffer: bool,
1492 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1493 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1494 let (tx, rx) = oneshot::channel();
1495 let mut response_channels_lock = self.response_channels.lock();
1496 response_channels_lock.insert(MessageId(envelope.id), tx);
1497 drop(response_channels_lock);
1498
1499 let result = if use_buffer {
1500 self.send_buffered(envelope)
1501 } else {
1502 self.send_unbuffered(envelope)
1503 };
1504 async move {
1505 if let Err(error) = &result {
1506 log::error!("failed to send message: {error}");
1507 anyhow::bail!("failed to send message: {error}");
1508 }
1509
1510 let response = rx.await.context("connection lost")?.0;
1511 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1512 return Err(RpcError::from_proto(error, type_name));
1513 }
1514 Ok(response)
1515 }
1516 }
1517
1518 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1519 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1520 self.send_buffered(envelope)
1521 }
1522
1523 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1524 envelope.ack_id = Some(self.max_received.load(SeqCst));
1525 self.buffer.lock().push_back(envelope.clone());
1526 // ignore errors on send (happen while we're reconnecting)
1527 // assume that the global "disconnected" overlay is sufficient.
1528 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1529 Ok(())
1530 }
1531
1532 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1533 envelope.ack_id = Some(self.max_received.load(SeqCst));
1534 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1535 Ok(())
1536 }
1537}
1538
1539impl ProtoClient for ChannelClient {
1540 fn request(
1541 &self,
1542 envelope: proto::Envelope,
1543 request_type: &'static str,
1544 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1545 self.request_dynamic(envelope, request_type, true).boxed()
1546 }
1547
1548 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1549 self.send_dynamic(envelope)
1550 }
1551
1552 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1553 self.send_dynamic(envelope)
1554 }
1555
1556 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1557 &self.message_handlers
1558 }
1559
1560 fn is_via_collab(&self) -> bool {
1561 false
1562 }
1563
1564 fn has_wsl_interop(&self) -> bool {
1565 self.has_wsl_interop
1566 }
1567}
1568
1569#[cfg(any(test, feature = "test-support"))]
1570mod fake {
1571 use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1572 use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1573 use anyhow::Result;
1574 use askpass::EncryptedPassword;
1575 use async_trait::async_trait;
1576 use collections::HashMap;
1577 use futures::{
1578 FutureExt, SinkExt, StreamExt,
1579 channel::{
1580 mpsc::{self, Sender},
1581 oneshot,
1582 },
1583 select_biased,
1584 };
1585 use gpui::{App, AppContext as _, AsyncApp, Task, TestAppContext};
1586 use release_channel::ReleaseChannel;
1587 use rpc::proto::Envelope;
1588 use semver::Version;
1589 use std::{path::PathBuf, sync::Arc};
1590 use util::paths::{PathStyle, RemotePathBuf};
1591
1592 pub(super) struct FakeRemoteConnection {
1593 pub(super) connection_options: RemoteConnectionOptions,
1594 pub(super) server_channel: Arc<ChannelClient>,
1595 pub(super) server_cx: SendableCx,
1596 }
1597
1598 pub(super) struct SendableCx(AsyncApp);
1599 impl SendableCx {
1600 // SAFETY: When run in test mode, GPUI is always single threaded.
1601 pub(super) fn new(cx: &TestAppContext) -> Self {
1602 Self(cx.to_async())
1603 }
1604
1605 // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1606 fn get(&self, _: &AsyncApp) -> AsyncApp {
1607 self.0.clone()
1608 }
1609 }
1610
1611 // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1612 unsafe impl Send for SendableCx {}
1613 unsafe impl Sync for SendableCx {}
1614
1615 #[async_trait(?Send)]
1616 impl RemoteConnection for FakeRemoteConnection {
1617 async fn kill(&self) -> Result<()> {
1618 Ok(())
1619 }
1620
1621 fn has_been_killed(&self) -> bool {
1622 false
1623 }
1624
1625 fn build_command(
1626 &self,
1627 program: Option<String>,
1628 args: &[String],
1629 env: &HashMap<String, String>,
1630 _: Option<String>,
1631 _: Option<(u16, String, u16)>,
1632 ) -> Result<CommandTemplate> {
1633 let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1634 let mut ssh_args = Vec::new();
1635 ssh_args.push(ssh_program);
1636 ssh_args.extend(args.iter().cloned());
1637 Ok(CommandTemplate {
1638 program: "ssh".into(),
1639 args: ssh_args,
1640 env: env.clone(),
1641 })
1642 }
1643
1644 fn build_forward_ports_command(
1645 &self,
1646 forwards: Vec<(u16, String, u16)>,
1647 ) -> anyhow::Result<CommandTemplate> {
1648 Ok(CommandTemplate {
1649 program: "ssh".into(),
1650 args: std::iter::once("-N".to_owned())
1651 .chain(forwards.into_iter().map(|(local_port, host, remote_port)| {
1652 format!("{local_port}:{host}:{remote_port}")
1653 }))
1654 .collect(),
1655 env: Default::default(),
1656 })
1657 }
1658
1659 fn upload_directory(
1660 &self,
1661 _src_path: PathBuf,
1662 _dest_path: RemotePathBuf,
1663 _cx: &App,
1664 ) -> Task<Result<()>> {
1665 unreachable!()
1666 }
1667
1668 fn connection_options(&self) -> RemoteConnectionOptions {
1669 self.connection_options.clone()
1670 }
1671
1672 fn simulate_disconnect(&self, cx: &AsyncApp) {
1673 let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1674 let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1675 self.server_channel
1676 .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1677 }
1678
1679 fn start_proxy(
1680 &self,
1681 _unique_identifier: String,
1682 _reconnect: bool,
1683 mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1684 mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1685 mut connection_activity_tx: Sender<()>,
1686 _delegate: Arc<dyn RemoteClientDelegate>,
1687 cx: &mut AsyncApp,
1688 ) -> Task<Result<i32>> {
1689 let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1690 let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1691
1692 self.server_channel.reconnect(
1693 server_incoming_rx,
1694 server_outgoing_tx,
1695 &self.server_cx.get(cx),
1696 );
1697
1698 cx.background_spawn(async move {
1699 loop {
1700 select_biased! {
1701 server_to_client = server_outgoing_rx.next().fuse() => {
1702 let Some(server_to_client) = server_to_client else {
1703 return Ok(1)
1704 };
1705 connection_activity_tx.try_send(()).ok();
1706 client_incoming_tx.send(server_to_client).await.ok();
1707 }
1708 client_to_server = client_outgoing_rx.next().fuse() => {
1709 let Some(client_to_server) = client_to_server else {
1710 return Ok(1)
1711 };
1712 server_incoming_tx.send(client_to_server).await.ok();
1713 }
1714 }
1715 }
1716 })
1717 }
1718
1719 fn path_style(&self) -> PathStyle {
1720 PathStyle::local()
1721 }
1722
1723 fn shell(&self) -> String {
1724 "sh".to_owned()
1725 }
1726
1727 fn default_system_shell(&self) -> String {
1728 "sh".to_owned()
1729 }
1730
1731 fn has_wsl_interop(&self) -> bool {
1732 false
1733 }
1734 }
1735
1736 pub(super) struct Delegate;
1737
1738 impl RemoteClientDelegate for Delegate {
1739 fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1740 unreachable!()
1741 }
1742
1743 fn download_server_binary_locally(
1744 &self,
1745 _: RemotePlatform,
1746 _: ReleaseChannel,
1747 _: Option<Version>,
1748 _: &mut AsyncApp,
1749 ) -> Task<Result<PathBuf>> {
1750 unreachable!()
1751 }
1752
1753 fn get_download_url(
1754 &self,
1755 _platform: RemotePlatform,
1756 _release_channel: ReleaseChannel,
1757 _version: Option<Version>,
1758 _cx: &mut AsyncApp,
1759 ) -> Task<Result<Option<String>>> {
1760 unreachable!()
1761 }
1762
1763 fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1764 }
1765}