1use crate::{
2 SshConnectionOptions,
3 protocol::MessageId,
4 proxy::ProxyLaunchError,
5 transport::{
6 docker::{DockerConnectionOptions, DockerExecConnection},
7 ssh::SshRemoteConnection,
8 wsl::{WslConnectionOptions, WslRemoteConnection},
9 },
10};
11use anyhow::{Context as _, Result, anyhow};
12use askpass::EncryptedPassword;
13use async_trait::async_trait;
14use collections::HashMap;
15use futures::{
16 Future, FutureExt as _, StreamExt as _,
17 channel::{
18 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
19 oneshot,
20 },
21 future::{BoxFuture, Shared},
22 select, select_biased,
23};
24use gpui::{
25 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
26 EventEmitter, FutureExt, Global, Task, WeakEntity,
27};
28use parking_lot::Mutex;
29
30use release_channel::ReleaseChannel;
31use rpc::{
32 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
33 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
34};
35use semver::Version;
36use std::{
37 collections::VecDeque,
38 fmt,
39 ops::ControlFlow,
40 path::PathBuf,
41 sync::{
42 Arc, Weak,
43 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
44 },
45 time::{Duration, Instant},
46};
47use util::{
48 ResultExt,
49 paths::{PathStyle, RemotePathBuf},
50};
51
52#[derive(Copy, Clone, Debug, PartialEq, Eq)]
53pub enum RemoteOs {
54 Linux,
55 MacOs,
56 Windows,
57}
58
59impl RemoteOs {
60 pub fn as_str(&self) -> &'static str {
61 match self {
62 RemoteOs::Linux => "linux",
63 RemoteOs::MacOs => "macos",
64 RemoteOs::Windows => "windows",
65 }
66 }
67
68 pub fn is_windows(&self) -> bool {
69 matches!(self, RemoteOs::Windows)
70 }
71}
72
73impl std::fmt::Display for RemoteOs {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.write_str(self.as_str())
76 }
77}
78
79#[derive(Copy, Clone, Debug, PartialEq, Eq)]
80pub enum RemoteArch {
81 X86_64,
82 Aarch64,
83}
84
85impl RemoteArch {
86 pub fn as_str(&self) -> &'static str {
87 match self {
88 RemoteArch::X86_64 => "x86_64",
89 RemoteArch::Aarch64 => "aarch64",
90 }
91 }
92}
93
94impl std::fmt::Display for RemoteArch {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.write_str(self.as_str())
97 }
98}
99
100#[derive(Copy, Clone, Debug)]
101pub struct RemotePlatform {
102 pub os: RemoteOs,
103 pub arch: RemoteArch,
104}
105
106#[derive(Clone, Debug)]
107pub struct CommandTemplate {
108 pub program: String,
109 pub args: Vec<String>,
110 pub env: HashMap<String, String>,
111}
112
113pub trait RemoteClientDelegate: Send + Sync {
114 fn ask_password(
115 &self,
116 prompt: String,
117 tx: oneshot::Sender<EncryptedPassword>,
118 cx: &mut AsyncApp,
119 );
120 fn get_download_url(
121 &self,
122 platform: RemotePlatform,
123 release_channel: ReleaseChannel,
124 version: Option<Version>,
125 cx: &mut AsyncApp,
126 ) -> Task<Result<Option<String>>>;
127 fn download_server_binary_locally(
128 &self,
129 platform: RemotePlatform,
130 release_channel: ReleaseChannel,
131 version: Option<Version>,
132 cx: &mut AsyncApp,
133 ) -> Task<Result<PathBuf>>;
134 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
135}
136
137const MAX_MISSED_HEARTBEATS: usize = 5;
138const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
139const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
140const INITIAL_CONNECTION_TIMEOUT: Duration =
141 Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
142
143const MAX_RECONNECT_ATTEMPTS: usize = 3;
144
145enum State {
146 Connecting,
147 Connected {
148 remote_connection: Arc<dyn RemoteConnection>,
149 delegate: Arc<dyn RemoteClientDelegate>,
150
151 multiplex_task: Task<Result<()>>,
152 heartbeat_task: Task<Result<()>>,
153 },
154 HeartbeatMissed {
155 missed_heartbeats: usize,
156
157 remote_connection: Arc<dyn RemoteConnection>,
158 delegate: Arc<dyn RemoteClientDelegate>,
159
160 multiplex_task: Task<Result<()>>,
161 heartbeat_task: Task<Result<()>>,
162 },
163 Reconnecting,
164 ReconnectFailed {
165 remote_connection: Arc<dyn RemoteConnection>,
166 delegate: Arc<dyn RemoteClientDelegate>,
167
168 error: anyhow::Error,
169 attempts: usize,
170 },
171 ReconnectExhausted,
172 ServerNotRunning,
173}
174
175impl fmt::Display for State {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 match self {
178 Self::Connecting => write!(f, "connecting"),
179 Self::Connected { .. } => write!(f, "connected"),
180 Self::Reconnecting => write!(f, "reconnecting"),
181 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
182 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
183 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
184 Self::ServerNotRunning { .. } => write!(f, "server not running"),
185 }
186 }
187}
188
189impl State {
190 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
191 match self {
192 Self::Connected {
193 remote_connection, ..
194 } => Some(remote_connection.clone()),
195 Self::HeartbeatMissed {
196 remote_connection, ..
197 } => Some(remote_connection.clone()),
198 Self::ReconnectFailed {
199 remote_connection, ..
200 } => Some(remote_connection.clone()),
201 _ => None,
202 }
203 }
204
205 fn can_reconnect(&self) -> bool {
206 match self {
207 Self::Connected { .. }
208 | Self::HeartbeatMissed { .. }
209 | Self::ReconnectFailed { .. } => true,
210 State::Connecting
211 | State::Reconnecting
212 | State::ReconnectExhausted
213 | State::ServerNotRunning => false,
214 }
215 }
216
217 fn is_reconnect_failed(&self) -> bool {
218 matches!(self, Self::ReconnectFailed { .. })
219 }
220
221 fn is_reconnect_exhausted(&self) -> bool {
222 matches!(self, Self::ReconnectExhausted { .. })
223 }
224
225 fn is_server_not_running(&self) -> bool {
226 matches!(self, Self::ServerNotRunning)
227 }
228
229 fn is_reconnecting(&self) -> bool {
230 matches!(self, Self::Reconnecting { .. })
231 }
232
233 fn heartbeat_recovered(self) -> Self {
234 match self {
235 Self::HeartbeatMissed {
236 remote_connection,
237 delegate,
238 multiplex_task,
239 heartbeat_task,
240 ..
241 } => Self::Connected {
242 remote_connection: remote_connection,
243 delegate,
244 multiplex_task,
245 heartbeat_task,
246 },
247 _ => self,
248 }
249 }
250
251 fn heartbeat_missed(self) -> Self {
252 match self {
253 Self::Connected {
254 remote_connection,
255 delegate,
256 multiplex_task,
257 heartbeat_task,
258 } => Self::HeartbeatMissed {
259 missed_heartbeats: 1,
260 remote_connection,
261 delegate,
262 multiplex_task,
263 heartbeat_task,
264 },
265 Self::HeartbeatMissed {
266 missed_heartbeats,
267 remote_connection,
268 delegate,
269 multiplex_task,
270 heartbeat_task,
271 } => Self::HeartbeatMissed {
272 missed_heartbeats: missed_heartbeats + 1,
273 remote_connection,
274 delegate,
275 multiplex_task,
276 heartbeat_task,
277 },
278 _ => self,
279 }
280 }
281}
282
283/// The state of the ssh connection.
284#[derive(Clone, Copy, Debug, PartialEq, Eq)]
285pub enum ConnectionState {
286 Connecting,
287 Connected,
288 HeartbeatMissed,
289 Reconnecting,
290 Disconnected,
291}
292
293impl From<&State> for ConnectionState {
294 fn from(value: &State) -> Self {
295 match value {
296 State::Connecting => Self::Connecting,
297 State::Connected { .. } => Self::Connected,
298 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
299 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
300 State::ReconnectExhausted => Self::Disconnected,
301 State::ServerNotRunning => Self::Disconnected,
302 }
303 }
304}
305
306pub struct RemoteClient {
307 client: Arc<ChannelClient>,
308 unique_identifier: String,
309 connection_options: RemoteConnectionOptions,
310 path_style: PathStyle,
311 state: Option<State>,
312}
313
314#[derive(Debug)]
315pub enum RemoteClientEvent {
316 Disconnected,
317}
318
319impl EventEmitter<RemoteClientEvent> for RemoteClient {}
320
321/// Identifies the socket on the remote server so that reconnects
322/// can re-join the same project.
323pub enum ConnectionIdentifier {
324 Setup(u64),
325 Workspace(i64),
326}
327
328static NEXT_ID: AtomicU64 = AtomicU64::new(1);
329
330impl ConnectionIdentifier {
331 pub fn setup() -> Self {
332 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
333 }
334
335 // This string gets used in a socket name, and so must be relatively short.
336 // The total length of:
337 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
338 // Must be less than about 100 characters
339 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
340 // So our strings should be at most 20 characters or so.
341 fn to_string(&self, cx: &App) -> String {
342 let identifier_prefix = match ReleaseChannel::global(cx) {
343 ReleaseChannel::Stable => "".to_string(),
344 release_channel => format!("{}-", release_channel.dev_name()),
345 };
346 match self {
347 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
348 Self::Workspace(workspace_id) => {
349 format!("{identifier_prefix}workspace-{workspace_id}",)
350 }
351 }
352 }
353}
354
355pub async fn connect(
356 connection_options: RemoteConnectionOptions,
357 delegate: Arc<dyn RemoteClientDelegate>,
358 cx: &mut AsyncApp,
359) -> Result<Arc<dyn RemoteConnection>> {
360 cx.update(|cx| {
361 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
362 pool.connect(connection_options.clone(), delegate.clone(), cx)
363 })
364 })?
365 .await
366 .map_err(|e| e.cloned())
367}
368
369impl RemoteClient {
370 pub fn new(
371 unique_identifier: ConnectionIdentifier,
372 remote_connection: Arc<dyn RemoteConnection>,
373 cancellation: oneshot::Receiver<()>,
374 delegate: Arc<dyn RemoteClientDelegate>,
375 cx: &mut App,
376 ) -> Task<Result<Option<Entity<Self>>>> {
377 let unique_identifier = unique_identifier.to_string(cx);
378 cx.spawn(async move |cx| {
379 let success = Box::pin(async move {
380 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
381 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
382 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
383
384 let client = cx.update(|cx| {
385 ChannelClient::new(
386 incoming_rx,
387 outgoing_tx,
388 cx,
389 "client",
390 remote_connection.has_wsl_interop(),
391 )
392 })?;
393
394 let path_style = remote_connection.path_style();
395 let this = cx.new(|_| Self {
396 client: client.clone(),
397 unique_identifier: unique_identifier.clone(),
398 connection_options: remote_connection.connection_options(),
399 path_style,
400 state: Some(State::Connecting),
401 })?;
402
403 let io_task = remote_connection.start_proxy(
404 unique_identifier,
405 false,
406 incoming_tx,
407 outgoing_rx,
408 connection_activity_tx,
409 delegate.clone(),
410 cx,
411 );
412
413 let ready = client
414 .wait_for_remote_started()
415 .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
416 .await;
417 match ready {
418 Ok(Some(_)) => {}
419 Ok(None) => {
420 let mut error = "remote client exited before becoming ready".to_owned();
421 if let Some(status) = io_task.now_or_never() {
422 match status {
423 Ok(exit_code) => {
424 error.push_str(&format!(", exit_code={exit_code:?}"))
425 }
426 Err(e) => error.push_str(&format!(", error={e:?}")),
427 }
428 }
429 let error = anyhow::anyhow!("{error}");
430 log::error!("failed to establish connection: {}", error);
431 return Err(error);
432 }
433 Err(_) => {
434 let mut error =
435 "remote client did not become ready within the timeout".to_owned();
436 if let Some(status) = io_task.now_or_never() {
437 match status {
438 Ok(exit_code) => {
439 error.push_str(&format!(", exit_code={exit_code:?}"))
440 }
441 Err(e) => error.push_str(&format!(", error={e:?}")),
442 }
443 }
444 let error = anyhow::anyhow!("{error}");
445 log::error!("failed to establish connection: {}", error);
446 return Err(error);
447 }
448 }
449 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
450 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
451 log::error!("failed to establish connection: {}", error);
452 return Err(error);
453 }
454
455 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
456
457 this.update(cx, |this, _| {
458 this.state = Some(State::Connected {
459 remote_connection,
460 delegate,
461 multiplex_task,
462 heartbeat_task,
463 });
464 })?;
465
466 Ok(Some(this))
467 });
468
469 select! {
470 _ = cancellation.fuse() => {
471 Ok(None)
472 }
473 result = success.fuse() => result
474 }
475 })
476 }
477
478 pub fn proto_client_from_channels(
479 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
480 outgoing_tx: mpsc::UnboundedSender<Envelope>,
481 cx: &App,
482 name: &'static str,
483 has_wsl_interop: bool,
484 ) -> AnyProtoClient {
485 ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
486 }
487
488 pub fn shutdown_processes<T: RequestMessage>(
489 &mut self,
490 shutdown_request: Option<T>,
491 executor: BackgroundExecutor,
492 ) -> Option<impl Future<Output = ()> + use<T>> {
493 let state = self.state.take()?;
494 log::info!("shutting down ssh processes");
495
496 let State::Connected {
497 multiplex_task,
498 heartbeat_task,
499 remote_connection,
500 delegate,
501 } = state
502 else {
503 return None;
504 };
505
506 let client = self.client.clone();
507
508 Some(async move {
509 if let Some(shutdown_request) = shutdown_request {
510 client.send(shutdown_request).log_err();
511 // We wait 50ms instead of waiting for a response, because
512 // waiting for a response would require us to wait on the main thread
513 // which we want to avoid in an `on_app_quit` callback.
514 executor.timer(Duration::from_millis(50)).await;
515 }
516
517 // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
518 // child of master_process.
519 drop(multiplex_task);
520 // Now drop the rest of state, which kills master process.
521 drop(heartbeat_task);
522 drop(remote_connection);
523 drop(delegate);
524 })
525 }
526
527 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
528 let can_reconnect = self
529 .state
530 .as_ref()
531 .map(|state| state.can_reconnect())
532 .unwrap_or(false);
533 if !can_reconnect {
534 log::info!("aborting reconnect, because not in state that allows reconnecting");
535 let error = if let Some(state) = self.state.as_ref() {
536 format!("invalid state, cannot reconnect while in state {state}")
537 } else {
538 "no state set".to_string()
539 };
540 anyhow::bail!(error);
541 }
542
543 let state = self.state.take().unwrap();
544 let (attempts, remote_connection, delegate) = match state {
545 State::Connected {
546 remote_connection,
547 delegate,
548 multiplex_task,
549 heartbeat_task,
550 }
551 | State::HeartbeatMissed {
552 remote_connection,
553 delegate,
554 multiplex_task,
555 heartbeat_task,
556 ..
557 } => {
558 drop(multiplex_task);
559 drop(heartbeat_task);
560 (0, remote_connection, delegate)
561 }
562 State::ReconnectFailed {
563 attempts,
564 remote_connection,
565 delegate,
566 ..
567 } => (attempts, remote_connection, delegate),
568 State::Connecting
569 | State::Reconnecting
570 | State::ReconnectExhausted
571 | State::ServerNotRunning => unreachable!(),
572 };
573
574 let attempts = attempts + 1;
575 if attempts > MAX_RECONNECT_ATTEMPTS {
576 log::error!(
577 "Failed to reconnect to after {} attempts, giving up",
578 MAX_RECONNECT_ATTEMPTS
579 );
580 self.set_state(State::ReconnectExhausted, cx);
581 return Ok(());
582 }
583
584 self.set_state(State::Reconnecting, cx);
585
586 log::info!(
587 "Trying to reconnect to remote server... Attempt {}",
588 attempts
589 );
590
591 let unique_identifier = self.unique_identifier.clone();
592 let client = self.client.clone();
593 let reconnect_task = cx.spawn(async move |this, cx| {
594 macro_rules! failed {
595 ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
596 delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
597 return State::ReconnectFailed {
598 error: anyhow!($error),
599 attempts: $attempts,
600 remote_connection: $remote_connection,
601 delegate: $delegate,
602 };
603 };
604 }
605
606 if let Err(error) = remote_connection
607 .kill()
608 .await
609 .context("Failed to kill remote_connection process")
610 {
611 failed!(error, attempts, remote_connection, delegate);
612 };
613
614 let connection_options = remote_connection.connection_options();
615
616 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
617 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
618 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
619
620 let (remote_connection, io_task) = match async {
621 let remote_connection = cx
622 .update_global(|pool: &mut ConnectionPool, cx| {
623 pool.connect(connection_options, delegate.clone(), cx)
624 })?
625 .await
626 .map_err(|error| error.cloned())?;
627
628 let io_task = remote_connection.start_proxy(
629 unique_identifier,
630 true,
631 incoming_tx,
632 outgoing_rx,
633 connection_activity_tx,
634 delegate.clone(),
635 cx,
636 );
637 anyhow::Ok((remote_connection, io_task))
638 }
639 .await
640 {
641 Ok((remote_connection, io_task)) => (remote_connection, io_task),
642 Err(error) => {
643 failed!(error, attempts, remote_connection, delegate);
644 }
645 };
646
647 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
648 client.reconnect(incoming_rx, outgoing_tx, cx);
649
650 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
651 failed!(error, attempts, remote_connection, delegate);
652 };
653
654 State::Connected {
655 remote_connection: remote_connection,
656 delegate,
657 multiplex_task,
658 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
659 }
660 });
661
662 cx.spawn(async move |this, cx| {
663 let new_state = reconnect_task.await;
664 this.update(cx, |this, cx| {
665 this.try_set_state(cx, |old_state| {
666 if old_state.is_reconnecting() {
667 match &new_state {
668 State::Connecting
669 | State::Reconnecting
670 | State::HeartbeatMissed { .. }
671 | State::ServerNotRunning => {}
672 State::Connected { .. } => {
673 log::info!("Successfully reconnected");
674 }
675 State::ReconnectFailed {
676 error, attempts, ..
677 } => {
678 log::error!(
679 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
680 attempts,
681 error
682 );
683 }
684 State::ReconnectExhausted => {
685 log::error!("Reconnect attempt failed and all attempts exhausted");
686 }
687 }
688 Some(new_state)
689 } else {
690 None
691 }
692 });
693
694 if this.state_is(State::is_reconnect_failed) {
695 this.reconnect(cx)
696 } else if this.state_is(State::is_reconnect_exhausted) {
697 Ok(())
698 } else {
699 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
700 Ok(())
701 }
702 })
703 })
704 .detach_and_log_err(cx);
705
706 Ok(())
707 }
708
709 fn heartbeat(
710 this: WeakEntity<Self>,
711 mut connection_activity_rx: mpsc::Receiver<()>,
712 cx: &mut AsyncApp,
713 ) -> Task<Result<()>> {
714 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
715 return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
716 };
717
718 cx.spawn(async move |cx| {
719 let mut missed_heartbeats = 0;
720
721 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
722 futures::pin_mut!(keepalive_timer);
723
724 loop {
725 select_biased! {
726 result = connection_activity_rx.next().fuse() => {
727 if result.is_none() {
728 log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
729 return Ok(());
730 }
731
732 if missed_heartbeats != 0 {
733 missed_heartbeats = 0;
734 let _ =this.update(cx, |this, cx| {
735 this.handle_heartbeat_result(missed_heartbeats, cx)
736 })?;
737 }
738 }
739 _ = keepalive_timer => {
740 log::debug!("Sending heartbeat to server...");
741
742 let result = select_biased! {
743 _ = connection_activity_rx.next().fuse() => {
744 Ok(())
745 }
746 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
747 ping_result
748 }
749 };
750
751 if result.is_err() {
752 missed_heartbeats += 1;
753 log::warn!(
754 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
755 HEARTBEAT_TIMEOUT,
756 missed_heartbeats,
757 MAX_MISSED_HEARTBEATS
758 );
759 } else if missed_heartbeats != 0 {
760 missed_heartbeats = 0;
761 } else {
762 continue;
763 }
764
765 let result = this.update(cx, |this, cx| {
766 this.handle_heartbeat_result(missed_heartbeats, cx)
767 })?;
768 if result.is_break() {
769 return Ok(());
770 }
771 }
772 }
773
774 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
775 }
776 })
777 }
778
779 fn handle_heartbeat_result(
780 &mut self,
781 missed_heartbeats: usize,
782 cx: &mut Context<Self>,
783 ) -> ControlFlow<()> {
784 let state = self.state.take().unwrap();
785 let next_state = if missed_heartbeats > 0 {
786 state.heartbeat_missed()
787 } else {
788 state.heartbeat_recovered()
789 };
790
791 self.set_state(next_state, cx);
792
793 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
794 log::error!(
795 "Missed last {} heartbeats. Reconnecting...",
796 missed_heartbeats
797 );
798
799 self.reconnect(cx)
800 .context("failed to start reconnect process after missing heartbeats")
801 .log_err();
802 ControlFlow::Break(())
803 } else {
804 ControlFlow::Continue(())
805 }
806 }
807
808 fn monitor(
809 this: WeakEntity<Self>,
810 io_task: Task<Result<i32>>,
811 cx: &AsyncApp,
812 ) -> Task<Result<()>> {
813 cx.spawn(async move |cx| {
814 let result = io_task.await;
815
816 match result {
817 Ok(exit_code) => {
818 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
819 match error {
820 ProxyLaunchError::ServerNotRunning => {
821 log::error!("failed to reconnect because server is not running");
822 this.update(cx, |this, cx| {
823 this.set_state(State::ServerNotRunning, cx);
824 })?;
825 }
826 }
827 } else if exit_code > 0 {
828 log::error!("proxy process terminated unexpectedly");
829 this.update(cx, |this, cx| {
830 this.reconnect(cx).ok();
831 })?;
832 }
833 }
834 Err(error) => {
835 log::warn!(
836 "remote io task died with error: {:?}. reconnecting...",
837 error
838 );
839 this.update(cx, |this, cx| {
840 this.reconnect(cx).ok();
841 })?;
842 }
843 }
844
845 Ok(())
846 })
847 }
848
849 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
850 self.state.as_ref().is_some_and(check)
851 }
852
853 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
854 let new_state = self.state.as_ref().and_then(map);
855 if let Some(new_state) = new_state {
856 self.state.replace(new_state);
857 cx.notify();
858 }
859 }
860
861 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
862 log::info!("setting state to '{}'", &state);
863
864 let is_reconnect_exhausted = state.is_reconnect_exhausted();
865 let is_server_not_running = state.is_server_not_running();
866 self.state.replace(state);
867
868 if is_reconnect_exhausted || is_server_not_running {
869 cx.emit(RemoteClientEvent::Disconnected);
870 }
871 cx.notify();
872 }
873
874 pub fn shell(&self) -> Option<String> {
875 Some(self.remote_connection()?.shell())
876 }
877
878 pub fn default_system_shell(&self) -> Option<String> {
879 Some(self.remote_connection()?.default_system_shell())
880 }
881
882 pub fn shares_network_interface(&self) -> bool {
883 self.remote_connection()
884 .map_or(false, |connection| connection.shares_network_interface())
885 }
886
887 pub fn build_command(
888 &self,
889 program: Option<String>,
890 args: &[String],
891 env: &HashMap<String, String>,
892 working_dir: Option<String>,
893 port_forward: Option<(u16, String, u16)>,
894 ) -> Result<CommandTemplate> {
895 let Some(connection) = self.remote_connection() else {
896 return Err(anyhow!("no remote connection"));
897 };
898 connection.build_command(program, args, env, working_dir, port_forward)
899 }
900
901 pub fn build_forward_ports_command(
902 &self,
903 forwards: Vec<(u16, String, u16)>,
904 ) -> Result<CommandTemplate> {
905 let Some(connection) = self.remote_connection() else {
906 return Err(anyhow!("no remote connection"));
907 };
908 connection.build_forward_ports_command(forwards)
909 }
910
911 pub fn upload_directory(
912 &self,
913 src_path: PathBuf,
914 dest_path: RemotePathBuf,
915 cx: &App,
916 ) -> Task<Result<()>> {
917 let Some(connection) = self.remote_connection() else {
918 return Task::ready(Err(anyhow!("no remote connection")));
919 };
920 connection.upload_directory(src_path, dest_path, cx)
921 }
922
923 pub fn proto_client(&self) -> AnyProtoClient {
924 self.client.clone().into()
925 }
926
927 pub fn connection_options(&self) -> RemoteConnectionOptions {
928 self.connection_options.clone()
929 }
930
931 pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
932 if let State::Connected {
933 remote_connection, ..
934 } = self.state.as_ref()?
935 {
936 Some(remote_connection.clone())
937 } else {
938 None
939 }
940 }
941
942 pub fn connection_state(&self) -> ConnectionState {
943 self.state
944 .as_ref()
945 .map(ConnectionState::from)
946 .unwrap_or(ConnectionState::Disconnected)
947 }
948
949 pub fn is_disconnected(&self) -> bool {
950 self.connection_state() == ConnectionState::Disconnected
951 }
952
953 pub fn path_style(&self) -> PathStyle {
954 self.path_style
955 }
956
957 #[cfg(any(test, feature = "test-support"))]
958 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
959 let opts = self.connection_options();
960 client_cx.spawn(async move |cx| {
961 let connection = cx
962 .update_global(|c: &mut ConnectionPool, _| {
963 if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
964 c.clone()
965 } else {
966 panic!("missing test connection")
967 }
968 })
969 .unwrap()
970 .await
971 .unwrap();
972
973 connection.simulate_disconnect(cx);
974 })
975 }
976
977 #[cfg(any(test, feature = "test-support"))]
978 pub fn fake_server(
979 client_cx: &mut gpui::TestAppContext,
980 server_cx: &mut gpui::TestAppContext,
981 ) -> (RemoteConnectionOptions, AnyProtoClient) {
982 use crate::transport::ssh::SshConnectionHost;
983
984 let port = client_cx
985 .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
986 let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
987 host: SshConnectionHost::from("<fake>".to_string()),
988 port: Some(port),
989 ..Default::default()
990 });
991 let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
992 let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
993 let server_client = server_cx
994 .update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server", false));
995 let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
996 connection_options: opts.clone(),
997 server_cx: fake::SendableCx::new(server_cx),
998 server_channel: server_client.clone(),
999 });
1000
1001 client_cx.update(|cx| {
1002 cx.update_default_global(|c: &mut ConnectionPool, cx| {
1003 c.connections.insert(
1004 opts.clone(),
1005 ConnectionPoolEntry::Connecting(
1006 cx.background_spawn({
1007 let connection = connection.clone();
1008 async move { Ok(connection.clone()) }
1009 })
1010 .shared(),
1011 ),
1012 );
1013 })
1014 });
1015
1016 (opts, server_client.into())
1017 }
1018
1019 #[cfg(any(test, feature = "test-support"))]
1020 pub async fn fake_client(
1021 opts: RemoteConnectionOptions,
1022 client_cx: &mut gpui::TestAppContext,
1023 ) -> Entity<Self> {
1024 let (_tx, rx) = oneshot::channel();
1025 let mut cx = client_cx.to_async();
1026 let connection = connect(opts, Arc::new(fake::Delegate), &mut cx)
1027 .await
1028 .unwrap();
1029 client_cx
1030 .update(|cx| {
1031 Self::new(
1032 ConnectionIdentifier::setup(),
1033 connection,
1034 rx,
1035 Arc::new(fake::Delegate),
1036 cx,
1037 )
1038 })
1039 .await
1040 .unwrap()
1041 .unwrap()
1042 }
1043
1044 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1045 self.state
1046 .as_ref()
1047 .and_then(|state| state.remote_connection())
1048 }
1049}
1050
1051enum ConnectionPoolEntry {
1052 Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1053 Connected(Weak<dyn RemoteConnection>),
1054}
1055
1056#[derive(Default)]
1057struct ConnectionPool {
1058 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1059}
1060
1061impl Global for ConnectionPool {}
1062
1063impl ConnectionPool {
1064 pub fn connect(
1065 &mut self,
1066 opts: RemoteConnectionOptions,
1067 delegate: Arc<dyn RemoteClientDelegate>,
1068 cx: &mut App,
1069 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1070 let connection = self.connections.get(&opts);
1071 match connection {
1072 Some(ConnectionPoolEntry::Connecting(task)) => {
1073 delegate.set_status(
1074 Some("Waiting for existing connection attempt"),
1075 &mut cx.to_async(),
1076 );
1077 return task.clone();
1078 }
1079 Some(ConnectionPoolEntry::Connected(remote)) => {
1080 if let Some(remote) = remote.upgrade()
1081 && !remote.has_been_killed()
1082 {
1083 return Task::ready(Ok(remote)).shared();
1084 }
1085 self.connections.remove(&opts);
1086 }
1087 None => {}
1088 }
1089
1090 let task = cx
1091 .spawn({
1092 let opts = opts.clone();
1093 let delegate = delegate.clone();
1094 async move |cx| {
1095 let connection = match opts.clone() {
1096 RemoteConnectionOptions::Ssh(opts) => {
1097 SshRemoteConnection::new(opts, delegate, cx)
1098 .await
1099 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1100 }
1101 RemoteConnectionOptions::Wsl(opts) => {
1102 WslRemoteConnection::new(opts, delegate, cx)
1103 .await
1104 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1105 }
1106 RemoteConnectionOptions::Docker(opts) => {
1107 DockerExecConnection::new(opts, delegate, cx)
1108 .await
1109 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1110 }
1111 };
1112
1113 cx.update_global(|pool: &mut Self, _| {
1114 debug_assert!(matches!(
1115 pool.connections.get(&opts),
1116 Some(ConnectionPoolEntry::Connecting(_))
1117 ));
1118 match connection {
1119 Ok(connection) => {
1120 pool.connections.insert(
1121 opts.clone(),
1122 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1123 );
1124 Ok(connection)
1125 }
1126 Err(error) => {
1127 pool.connections.remove(&opts);
1128 Err(Arc::new(error))
1129 }
1130 }
1131 })?
1132 }
1133 })
1134 .shared();
1135
1136 self.connections
1137 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1138 task
1139 }
1140}
1141
1142#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1143pub enum RemoteConnectionOptions {
1144 Ssh(SshConnectionOptions),
1145 Wsl(WslConnectionOptions),
1146 Docker(DockerConnectionOptions),
1147}
1148
1149impl RemoteConnectionOptions {
1150 pub fn display_name(&self) -> String {
1151 match self {
1152 RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1153 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1154 RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1155 }
1156 }
1157}
1158
1159impl From<SshConnectionOptions> for RemoteConnectionOptions {
1160 fn from(opts: SshConnectionOptions) -> Self {
1161 RemoteConnectionOptions::Ssh(opts)
1162 }
1163}
1164
1165impl From<WslConnectionOptions> for RemoteConnectionOptions {
1166 fn from(opts: WslConnectionOptions) -> Self {
1167 RemoteConnectionOptions::Wsl(opts)
1168 }
1169}
1170
1171#[cfg(target_os = "windows")]
1172/// Open a wsl path (\\wsl.localhost\<distro>\path)
1173#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1174#[action(namespace = workspace, no_json, no_register)]
1175pub struct OpenWslPath {
1176 pub distro: WslConnectionOptions,
1177 pub paths: Vec<PathBuf>,
1178}
1179
1180#[async_trait(?Send)]
1181pub trait RemoteConnection: Send + Sync {
1182 fn start_proxy(
1183 &self,
1184 unique_identifier: String,
1185 reconnect: bool,
1186 incoming_tx: UnboundedSender<Envelope>,
1187 outgoing_rx: UnboundedReceiver<Envelope>,
1188 connection_activity_tx: Sender<()>,
1189 delegate: Arc<dyn RemoteClientDelegate>,
1190 cx: &mut AsyncApp,
1191 ) -> Task<Result<i32>>;
1192 fn upload_directory(
1193 &self,
1194 src_path: PathBuf,
1195 dest_path: RemotePathBuf,
1196 cx: &App,
1197 ) -> Task<Result<()>>;
1198 async fn kill(&self) -> Result<()>;
1199 fn has_been_killed(&self) -> bool;
1200 fn shares_network_interface(&self) -> bool {
1201 false
1202 }
1203 fn build_command(
1204 &self,
1205 program: Option<String>,
1206 args: &[String],
1207 env: &HashMap<String, String>,
1208 working_dir: Option<String>,
1209 port_forward: Option<(u16, String, u16)>,
1210 ) -> Result<CommandTemplate>;
1211 fn build_forward_ports_command(
1212 &self,
1213 forwards: Vec<(u16, String, u16)>,
1214 ) -> Result<CommandTemplate>;
1215 fn connection_options(&self) -> RemoteConnectionOptions;
1216 fn path_style(&self) -> PathStyle;
1217 fn shell(&self) -> String;
1218 fn default_system_shell(&self) -> String;
1219 fn has_wsl_interop(&self) -> bool;
1220
1221 #[cfg(any(test, feature = "test-support"))]
1222 fn simulate_disconnect(&self, _: &AsyncApp) {}
1223}
1224
1225type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1226
1227struct Signal<T> {
1228 tx: Mutex<Option<oneshot::Sender<T>>>,
1229 rx: Shared<Task<Option<T>>>,
1230}
1231
1232impl<T: Send + Clone + 'static> Signal<T> {
1233 pub fn new(cx: &App) -> Self {
1234 let (tx, rx) = oneshot::channel();
1235
1236 let task = cx
1237 .background_executor()
1238 .spawn(async move { rx.await.ok() })
1239 .shared();
1240
1241 Self {
1242 tx: Mutex::new(Some(tx)),
1243 rx: task,
1244 }
1245 }
1246
1247 fn set(&self, value: T) {
1248 if let Some(tx) = self.tx.lock().take() {
1249 let _ = tx.send(value);
1250 }
1251 }
1252
1253 fn wait(&self) -> Shared<Task<Option<T>>> {
1254 self.rx.clone()
1255 }
1256}
1257
1258struct ChannelClient {
1259 next_message_id: AtomicU32,
1260 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1261 buffer: Mutex<VecDeque<Envelope>>,
1262 response_channels: ResponseChannels,
1263 message_handlers: Mutex<ProtoMessageHandlerSet>,
1264 max_received: AtomicU32,
1265 name: &'static str,
1266 task: Mutex<Task<Result<()>>>,
1267 remote_started: Signal<()>,
1268 has_wsl_interop: bool,
1269}
1270
1271impl ChannelClient {
1272 fn new(
1273 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1274 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1275 cx: &App,
1276 name: &'static str,
1277 has_wsl_interop: bool,
1278 ) -> Arc<Self> {
1279 Arc::new_cyclic(|this| Self {
1280 outgoing_tx: Mutex::new(outgoing_tx),
1281 next_message_id: AtomicU32::new(0),
1282 max_received: AtomicU32::new(0),
1283 response_channels: ResponseChannels::default(),
1284 message_handlers: Default::default(),
1285 buffer: Mutex::new(VecDeque::new()),
1286 name,
1287 task: Mutex::new(Self::start_handling_messages(
1288 this.clone(),
1289 incoming_rx,
1290 &cx.to_async(),
1291 )),
1292 remote_started: Signal::new(cx),
1293 has_wsl_interop,
1294 })
1295 }
1296
1297 fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1298 self.remote_started.wait()
1299 }
1300
1301 fn start_handling_messages(
1302 this: Weak<Self>,
1303 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1304 cx: &AsyncApp,
1305 ) -> Task<Result<()>> {
1306 cx.spawn(async move |cx| {
1307 if let Some(this) = this.upgrade() {
1308 let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1309 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1310 };
1311
1312 let peer_id = PeerId { owner_id: 0, id: 0 };
1313 while let Some(incoming) = incoming_rx.next().await {
1314 let Some(this) = this.upgrade() else {
1315 return anyhow::Ok(());
1316 };
1317 if let Some(ack_id) = incoming.ack_id {
1318 let mut buffer = this.buffer.lock();
1319 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1320 buffer.pop_front();
1321 }
1322 }
1323 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1324 {
1325 log::debug!(
1326 "{}:remote message received. name:FlushBufferedMessages",
1327 this.name
1328 );
1329 {
1330 let buffer = this.buffer.lock();
1331 for envelope in buffer.iter() {
1332 this.outgoing_tx
1333 .lock()
1334 .unbounded_send(envelope.clone())
1335 .ok();
1336 }
1337 }
1338 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1339 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1340 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1341 continue;
1342 }
1343
1344 if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1345 this.remote_started.set(());
1346 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1347 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1348 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1349 continue;
1350 }
1351
1352 this.max_received.store(incoming.id, SeqCst);
1353
1354 if let Some(request_id) = incoming.responding_to {
1355 let request_id = MessageId(request_id);
1356 let sender = this.response_channels.lock().remove(&request_id);
1357 if let Some(sender) = sender {
1358 let (tx, rx) = oneshot::channel();
1359 if incoming.payload.is_some() {
1360 sender.send((incoming, tx)).ok();
1361 }
1362 rx.await.ok();
1363 }
1364 } else if let Some(envelope) =
1365 build_typed_envelope(peer_id, Instant::now(), incoming)
1366 {
1367 let type_name = envelope.payload_type_name();
1368 let message_id = envelope.message_id();
1369 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1370 &this.message_handlers,
1371 envelope,
1372 this.clone().into(),
1373 cx.clone(),
1374 ) {
1375 log::debug!("{}:remote message received. name:{type_name}", this.name);
1376 cx.foreground_executor()
1377 .spawn(async move {
1378 match future.await {
1379 Ok(_) => {
1380 log::debug!(
1381 "{}:remote message handled. name:{type_name}",
1382 this.name
1383 );
1384 }
1385 Err(error) => {
1386 log::error!(
1387 "{}:error handling message. type:{}, error:{:#}",
1388 this.name,
1389 type_name,
1390 format!("{error:#}").lines().fold(
1391 String::new(),
1392 |mut message, line| {
1393 if !message.is_empty() {
1394 message.push(' ');
1395 }
1396 message.push_str(line);
1397 message
1398 }
1399 )
1400 );
1401 }
1402 }
1403 })
1404 .detach()
1405 } else {
1406 log::error!("{}:unhandled remote message name:{type_name}", this.name);
1407 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1408 message_id,
1409 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1410 ) {
1411 log::error!(
1412 "{}:error sending error response for {type_name}:{e:#}",
1413 this.name
1414 );
1415 }
1416 }
1417 }
1418 }
1419 anyhow::Ok(())
1420 })
1421 }
1422
1423 fn reconnect(
1424 self: &Arc<Self>,
1425 incoming_rx: UnboundedReceiver<Envelope>,
1426 outgoing_tx: UnboundedSender<Envelope>,
1427 cx: &AsyncApp,
1428 ) {
1429 *self.outgoing_tx.lock() = outgoing_tx;
1430 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1431 }
1432
1433 fn request<T: RequestMessage>(
1434 &self,
1435 payload: T,
1436 ) -> impl 'static + Future<Output = Result<T::Response>> {
1437 self.request_internal(payload, true)
1438 }
1439
1440 fn request_internal<T: RequestMessage>(
1441 &self,
1442 payload: T,
1443 use_buffer: bool,
1444 ) -> impl 'static + Future<Output = Result<T::Response>> {
1445 log::debug!("remote request start. name:{}", T::NAME);
1446 let response =
1447 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1448 async move {
1449 let response = response.await?;
1450 log::debug!("remote request finish. name:{}", T::NAME);
1451 T::Response::from_envelope(response).context("received a response of the wrong type")
1452 }
1453 }
1454
1455 async fn resync(&self, timeout: Duration) -> Result<()> {
1456 smol::future::or(
1457 async {
1458 self.request_internal(proto::FlushBufferedMessages {}, false)
1459 .await?;
1460
1461 for envelope in self.buffer.lock().iter() {
1462 self.outgoing_tx
1463 .lock()
1464 .unbounded_send(envelope.clone())
1465 .ok();
1466 }
1467 Ok(())
1468 },
1469 async {
1470 smol::Timer::after(timeout).await;
1471 anyhow::bail!("Timed out resyncing remote client")
1472 },
1473 )
1474 .await
1475 }
1476
1477 async fn ping(&self, timeout: Duration) -> Result<()> {
1478 smol::future::or(
1479 async {
1480 self.request(proto::Ping {}).await?;
1481 Ok(())
1482 },
1483 async {
1484 smol::Timer::after(timeout).await;
1485 anyhow::bail!("Timed out pinging remote client")
1486 },
1487 )
1488 .await
1489 }
1490
1491 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1492 log::debug!("remote send name:{}", T::NAME);
1493 self.send_dynamic(payload.into_envelope(0, None, None))
1494 }
1495
1496 fn request_dynamic(
1497 &self,
1498 mut envelope: proto::Envelope,
1499 type_name: &'static str,
1500 use_buffer: bool,
1501 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1502 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1503 let (tx, rx) = oneshot::channel();
1504 let mut response_channels_lock = self.response_channels.lock();
1505 response_channels_lock.insert(MessageId(envelope.id), tx);
1506 drop(response_channels_lock);
1507
1508 let result = if use_buffer {
1509 self.send_buffered(envelope)
1510 } else {
1511 self.send_unbuffered(envelope)
1512 };
1513 async move {
1514 if let Err(error) = &result {
1515 log::error!("failed to send message: {error}");
1516 anyhow::bail!("failed to send message: {error}");
1517 }
1518
1519 let response = rx.await.context("connection lost")?.0;
1520 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1521 return Err(RpcError::from_proto(error, type_name));
1522 }
1523 Ok(response)
1524 }
1525 }
1526
1527 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1528 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1529 self.send_buffered(envelope)
1530 }
1531
1532 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1533 envelope.ack_id = Some(self.max_received.load(SeqCst));
1534 self.buffer.lock().push_back(envelope.clone());
1535 // ignore errors on send (happen while we're reconnecting)
1536 // assume that the global "disconnected" overlay is sufficient.
1537 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1538 Ok(())
1539 }
1540
1541 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1542 envelope.ack_id = Some(self.max_received.load(SeqCst));
1543 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1544 Ok(())
1545 }
1546}
1547
1548impl ProtoClient for ChannelClient {
1549 fn request(
1550 &self,
1551 envelope: proto::Envelope,
1552 request_type: &'static str,
1553 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1554 self.request_dynamic(envelope, request_type, true).boxed()
1555 }
1556
1557 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1558 self.send_dynamic(envelope)
1559 }
1560
1561 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1562 self.send_dynamic(envelope)
1563 }
1564
1565 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1566 &self.message_handlers
1567 }
1568
1569 fn is_via_collab(&self) -> bool {
1570 false
1571 }
1572
1573 fn has_wsl_interop(&self) -> bool {
1574 self.has_wsl_interop
1575 }
1576}
1577
1578#[cfg(any(test, feature = "test-support"))]
1579mod fake {
1580 use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1581 use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1582 use anyhow::Result;
1583 use askpass::EncryptedPassword;
1584 use async_trait::async_trait;
1585 use collections::HashMap;
1586 use futures::{
1587 FutureExt, SinkExt, StreamExt,
1588 channel::{
1589 mpsc::{self, Sender},
1590 oneshot,
1591 },
1592 select_biased,
1593 };
1594 use gpui::{App, AppContext as _, AsyncApp, Task, TestAppContext};
1595 use release_channel::ReleaseChannel;
1596 use rpc::proto::Envelope;
1597 use semver::Version;
1598 use std::{path::PathBuf, sync::Arc};
1599 use util::paths::{PathStyle, RemotePathBuf};
1600
1601 pub(super) struct FakeRemoteConnection {
1602 pub(super) connection_options: RemoteConnectionOptions,
1603 pub(super) server_channel: Arc<ChannelClient>,
1604 pub(super) server_cx: SendableCx,
1605 }
1606
1607 pub(super) struct SendableCx(AsyncApp);
1608 impl SendableCx {
1609 // SAFETY: When run in test mode, GPUI is always single threaded.
1610 pub(super) fn new(cx: &TestAppContext) -> Self {
1611 Self(cx.to_async())
1612 }
1613
1614 // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1615 fn get(&self, _: &AsyncApp) -> AsyncApp {
1616 self.0.clone()
1617 }
1618 }
1619
1620 // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1621 unsafe impl Send for SendableCx {}
1622 unsafe impl Sync for SendableCx {}
1623
1624 #[async_trait(?Send)]
1625 impl RemoteConnection for FakeRemoteConnection {
1626 async fn kill(&self) -> Result<()> {
1627 Ok(())
1628 }
1629
1630 fn has_been_killed(&self) -> bool {
1631 false
1632 }
1633
1634 fn build_command(
1635 &self,
1636 program: Option<String>,
1637 args: &[String],
1638 env: &HashMap<String, String>,
1639 _: Option<String>,
1640 _: Option<(u16, String, u16)>,
1641 ) -> Result<CommandTemplate> {
1642 let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1643 let mut ssh_args = Vec::new();
1644 ssh_args.push(ssh_program);
1645 ssh_args.extend(args.iter().cloned());
1646 Ok(CommandTemplate {
1647 program: "ssh".into(),
1648 args: ssh_args,
1649 env: env.clone(),
1650 })
1651 }
1652
1653 fn build_forward_ports_command(
1654 &self,
1655 forwards: Vec<(u16, String, u16)>,
1656 ) -> anyhow::Result<CommandTemplate> {
1657 Ok(CommandTemplate {
1658 program: "ssh".into(),
1659 args: std::iter::once("-N".to_owned())
1660 .chain(forwards.into_iter().map(|(local_port, host, remote_port)| {
1661 format!("{local_port}:{host}:{remote_port}")
1662 }))
1663 .collect(),
1664 env: Default::default(),
1665 })
1666 }
1667
1668 fn upload_directory(
1669 &self,
1670 _src_path: PathBuf,
1671 _dest_path: RemotePathBuf,
1672 _cx: &App,
1673 ) -> Task<Result<()>> {
1674 unreachable!()
1675 }
1676
1677 fn connection_options(&self) -> RemoteConnectionOptions {
1678 self.connection_options.clone()
1679 }
1680
1681 fn simulate_disconnect(&self, cx: &AsyncApp) {
1682 let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1683 let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1684 self.server_channel
1685 .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1686 }
1687
1688 fn start_proxy(
1689 &self,
1690 _unique_identifier: String,
1691 _reconnect: bool,
1692 mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1693 mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1694 mut connection_activity_tx: Sender<()>,
1695 _delegate: Arc<dyn RemoteClientDelegate>,
1696 cx: &mut AsyncApp,
1697 ) -> Task<Result<i32>> {
1698 let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1699 let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1700
1701 self.server_channel.reconnect(
1702 server_incoming_rx,
1703 server_outgoing_tx,
1704 &self.server_cx.get(cx),
1705 );
1706
1707 cx.background_spawn(async move {
1708 loop {
1709 select_biased! {
1710 server_to_client = server_outgoing_rx.next().fuse() => {
1711 let Some(server_to_client) = server_to_client else {
1712 return Ok(1)
1713 };
1714 connection_activity_tx.try_send(()).ok();
1715 client_incoming_tx.send(server_to_client).await.ok();
1716 }
1717 client_to_server = client_outgoing_rx.next().fuse() => {
1718 let Some(client_to_server) = client_to_server else {
1719 return Ok(1)
1720 };
1721 server_incoming_tx.send(client_to_server).await.ok();
1722 }
1723 }
1724 }
1725 })
1726 }
1727
1728 fn path_style(&self) -> PathStyle {
1729 PathStyle::local()
1730 }
1731
1732 fn shell(&self) -> String {
1733 "sh".to_owned()
1734 }
1735
1736 fn default_system_shell(&self) -> String {
1737 "sh".to_owned()
1738 }
1739
1740 fn has_wsl_interop(&self) -> bool {
1741 false
1742 }
1743 }
1744
1745 pub(super) struct Delegate;
1746
1747 impl RemoteClientDelegate for Delegate {
1748 fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1749 unreachable!()
1750 }
1751
1752 fn download_server_binary_locally(
1753 &self,
1754 _: RemotePlatform,
1755 _: ReleaseChannel,
1756 _: Option<Version>,
1757 _: &mut AsyncApp,
1758 ) -> Task<Result<PathBuf>> {
1759 unreachable!()
1760 }
1761
1762 fn get_download_url(
1763 &self,
1764 _platform: RemotePlatform,
1765 _release_channel: ReleaseChannel,
1766 _version: Option<Version>,
1767 _cx: &mut AsyncApp,
1768 ) -> Task<Result<Option<String>>> {
1769 unreachable!()
1770 }
1771
1772 fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1773 }
1774}