1use crate::{
2 SshConnectionOptions,
3 protocol::MessageId,
4 proxy::ProxyLaunchError,
5 transport::{
6 ssh::SshRemoteConnection,
7 wsl::{WslConnectionOptions, WslRemoteConnection},
8 },
9};
10use anyhow::{Context as _, Result, anyhow};
11use askpass::EncryptedPassword;
12use async_trait::async_trait;
13use collections::HashMap;
14use futures::{
15 Future, FutureExt as _, StreamExt as _,
16 channel::{
17 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
18 oneshot,
19 },
20 future::{BoxFuture, Shared},
21 select, select_biased,
22};
23use gpui::{
24 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
25 EventEmitter, Global, SemanticVersion, Task, WeakEntity,
26};
27use parking_lot::Mutex;
28
29use release_channel::ReleaseChannel;
30use rpc::{
31 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
32 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
33};
34use std::{
35 collections::VecDeque,
36 fmt,
37 ops::ControlFlow,
38 path::PathBuf,
39 sync::{
40 Arc, Weak,
41 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
42 },
43 time::{Duration, Instant},
44};
45use util::{
46 ResultExt,
47 paths::{PathStyle, RemotePathBuf},
48};
49
50#[derive(Copy, Clone, Debug)]
51pub struct RemotePlatform {
52 pub os: &'static str,
53 pub arch: &'static str,
54}
55
56#[derive(Clone, Debug)]
57pub struct CommandTemplate {
58 pub program: String,
59 pub args: Vec<String>,
60 pub env: HashMap<String, String>,
61}
62
63pub trait RemoteClientDelegate: Send + Sync {
64 fn ask_password(
65 &self,
66 prompt: String,
67 tx: oneshot::Sender<EncryptedPassword>,
68 cx: &mut AsyncApp,
69 );
70 fn get_download_params(
71 &self,
72 platform: RemotePlatform,
73 release_channel: ReleaseChannel,
74 version: Option<SemanticVersion>,
75 cx: &mut AsyncApp,
76 ) -> Task<Result<Option<(String, String)>>>;
77 fn download_server_binary_locally(
78 &self,
79 platform: RemotePlatform,
80 release_channel: ReleaseChannel,
81 version: Option<SemanticVersion>,
82 cx: &mut AsyncApp,
83 ) -> Task<Result<PathBuf>>;
84 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
85}
86
87const MAX_MISSED_HEARTBEATS: usize = 5;
88const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
89const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
90
91const MAX_RECONNECT_ATTEMPTS: usize = 3;
92
93enum State {
94 Connecting,
95 Connected {
96 ssh_connection: Arc<dyn RemoteConnection>,
97 delegate: Arc<dyn RemoteClientDelegate>,
98
99 multiplex_task: Task<Result<()>>,
100 heartbeat_task: Task<Result<()>>,
101 },
102 HeartbeatMissed {
103 missed_heartbeats: usize,
104
105 ssh_connection: Arc<dyn RemoteConnection>,
106 delegate: Arc<dyn RemoteClientDelegate>,
107
108 multiplex_task: Task<Result<()>>,
109 heartbeat_task: Task<Result<()>>,
110 },
111 Reconnecting,
112 ReconnectFailed {
113 ssh_connection: Arc<dyn RemoteConnection>,
114 delegate: Arc<dyn RemoteClientDelegate>,
115
116 error: anyhow::Error,
117 attempts: usize,
118 },
119 ReconnectExhausted,
120 ServerNotRunning,
121}
122
123impl fmt::Display for State {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 match self {
126 Self::Connecting => write!(f, "connecting"),
127 Self::Connected { .. } => write!(f, "connected"),
128 Self::Reconnecting => write!(f, "reconnecting"),
129 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
130 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
131 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
132 Self::ServerNotRunning { .. } => write!(f, "server not running"),
133 }
134 }
135}
136
137impl State {
138 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
139 match self {
140 Self::Connected { ssh_connection, .. } => Some(ssh_connection.clone()),
141 Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection.clone()),
142 Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection.clone()),
143 _ => None,
144 }
145 }
146
147 fn can_reconnect(&self) -> bool {
148 match self {
149 Self::Connected { .. }
150 | Self::HeartbeatMissed { .. }
151 | Self::ReconnectFailed { .. } => true,
152 State::Connecting
153 | State::Reconnecting
154 | State::ReconnectExhausted
155 | State::ServerNotRunning => false,
156 }
157 }
158
159 fn is_reconnect_failed(&self) -> bool {
160 matches!(self, Self::ReconnectFailed { .. })
161 }
162
163 fn is_reconnect_exhausted(&self) -> bool {
164 matches!(self, Self::ReconnectExhausted { .. })
165 }
166
167 fn is_server_not_running(&self) -> bool {
168 matches!(self, Self::ServerNotRunning)
169 }
170
171 fn is_reconnecting(&self) -> bool {
172 matches!(self, Self::Reconnecting { .. })
173 }
174
175 fn heartbeat_recovered(self) -> Self {
176 match self {
177 Self::HeartbeatMissed {
178 ssh_connection,
179 delegate,
180 multiplex_task,
181 heartbeat_task,
182 ..
183 } => Self::Connected {
184 ssh_connection,
185 delegate,
186 multiplex_task,
187 heartbeat_task,
188 },
189 _ => self,
190 }
191 }
192
193 fn heartbeat_missed(self) -> Self {
194 match self {
195 Self::Connected {
196 ssh_connection,
197 delegate,
198 multiplex_task,
199 heartbeat_task,
200 } => Self::HeartbeatMissed {
201 missed_heartbeats: 1,
202 ssh_connection,
203 delegate,
204 multiplex_task,
205 heartbeat_task,
206 },
207 Self::HeartbeatMissed {
208 missed_heartbeats,
209 ssh_connection,
210 delegate,
211 multiplex_task,
212 heartbeat_task,
213 } => Self::HeartbeatMissed {
214 missed_heartbeats: missed_heartbeats + 1,
215 ssh_connection,
216 delegate,
217 multiplex_task,
218 heartbeat_task,
219 },
220 _ => self,
221 }
222 }
223}
224
225/// The state of the ssh connection.
226#[derive(Clone, Copy, Debug, PartialEq, Eq)]
227pub enum ConnectionState {
228 Connecting,
229 Connected,
230 HeartbeatMissed,
231 Reconnecting,
232 Disconnected,
233}
234
235impl From<&State> for ConnectionState {
236 fn from(value: &State) -> Self {
237 match value {
238 State::Connecting => Self::Connecting,
239 State::Connected { .. } => Self::Connected,
240 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
241 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
242 State::ReconnectExhausted => Self::Disconnected,
243 State::ServerNotRunning => Self::Disconnected,
244 }
245 }
246}
247
248pub struct RemoteClient {
249 client: Arc<ChannelClient>,
250 unique_identifier: String,
251 connection_options: RemoteConnectionOptions,
252 path_style: PathStyle,
253 state: Option<State>,
254}
255
256#[derive(Debug)]
257pub enum RemoteClientEvent {
258 Disconnected,
259}
260
261impl EventEmitter<RemoteClientEvent> for RemoteClient {}
262
263// Identifies the socket on the remote server so that reconnects
264// can re-join the same project.
265pub enum ConnectionIdentifier {
266 Setup(u64),
267 Workspace(i64),
268}
269
270static NEXT_ID: AtomicU64 = AtomicU64::new(1);
271
272impl ConnectionIdentifier {
273 pub fn setup() -> Self {
274 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
275 }
276
277 // This string gets used in a socket name, and so must be relatively short.
278 // The total length of:
279 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
280 // Must be less than about 100 characters
281 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
282 // So our strings should be at most 20 characters or so.
283 fn to_string(&self, cx: &App) -> String {
284 let identifier_prefix = match ReleaseChannel::global(cx) {
285 ReleaseChannel::Stable => "".to_string(),
286 release_channel => format!("{}-", release_channel.dev_name()),
287 };
288 match self {
289 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
290 Self::Workspace(workspace_id) => {
291 format!("{identifier_prefix}workspace-{workspace_id}",)
292 }
293 }
294 }
295}
296
297impl RemoteClient {
298 pub fn ssh(
299 unique_identifier: ConnectionIdentifier,
300 connection_options: SshConnectionOptions,
301 cancellation: oneshot::Receiver<()>,
302 delegate: Arc<dyn RemoteClientDelegate>,
303 cx: &mut App,
304 ) -> Task<Result<Option<Entity<Self>>>> {
305 Self::new(
306 unique_identifier,
307 RemoteConnectionOptions::Ssh(connection_options),
308 cancellation,
309 delegate,
310 cx,
311 )
312 }
313
314 pub fn new(
315 unique_identifier: ConnectionIdentifier,
316 connection_options: RemoteConnectionOptions,
317 cancellation: oneshot::Receiver<()>,
318 delegate: Arc<dyn RemoteClientDelegate>,
319 cx: &mut App,
320 ) -> Task<Result<Option<Entity<Self>>>> {
321 let unique_identifier = unique_identifier.to_string(cx);
322 cx.spawn(async move |cx| {
323 let success = Box::pin(async move {
324 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
325 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
326 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
327
328 let client =
329 cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "client"))?;
330
331 let ssh_connection = cx
332 .update(|cx| {
333 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
334 pool.connect(connection_options.clone(), &delegate, cx)
335 })
336 })?
337 .await
338 .map_err(|e| e.cloned())?;
339
340 let path_style = ssh_connection.path_style();
341 let this = cx.new(|_| Self {
342 client: client.clone(),
343 unique_identifier: unique_identifier.clone(),
344 connection_options,
345 path_style,
346 state: Some(State::Connecting),
347 })?;
348
349 let io_task = ssh_connection.start_proxy(
350 unique_identifier,
351 false,
352 incoming_tx,
353 outgoing_rx,
354 connection_activity_tx,
355 delegate.clone(),
356 cx,
357 );
358
359 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
360
361 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
362 log::error!("failed to establish connection: {}", error);
363 return Err(error);
364 }
365
366 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
367
368 this.update(cx, |this, _| {
369 this.state = Some(State::Connected {
370 ssh_connection,
371 delegate,
372 multiplex_task,
373 heartbeat_task,
374 });
375 })?;
376
377 Ok(Some(this))
378 });
379
380 select! {
381 _ = cancellation.fuse() => {
382 Ok(None)
383 }
384 result = success.fuse() => result
385 }
386 })
387 }
388
389 pub fn proto_client_from_channels(
390 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
391 outgoing_tx: mpsc::UnboundedSender<Envelope>,
392 cx: &App,
393 name: &'static str,
394 ) -> AnyProtoClient {
395 ChannelClient::new(incoming_rx, outgoing_tx, cx, name).into()
396 }
397
398 pub fn shutdown_processes<T: RequestMessage>(
399 &mut self,
400 shutdown_request: Option<T>,
401 executor: BackgroundExecutor,
402 ) -> Option<impl Future<Output = ()> + use<T>> {
403 let state = self.state.take()?;
404 log::info!("shutting down ssh processes");
405
406 let State::Connected {
407 multiplex_task,
408 heartbeat_task,
409 ssh_connection,
410 delegate,
411 } = state
412 else {
413 return None;
414 };
415
416 let client = self.client.clone();
417
418 Some(async move {
419 if let Some(shutdown_request) = shutdown_request {
420 client.send(shutdown_request).log_err();
421 // We wait 50ms instead of waiting for a response, because
422 // waiting for a response would require us to wait on the main thread
423 // which we want to avoid in an `on_app_quit` callback.
424 executor.timer(Duration::from_millis(50)).await;
425 }
426
427 // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
428 // child of master_process.
429 drop(multiplex_task);
430 // Now drop the rest of state, which kills master process.
431 drop(heartbeat_task);
432 drop(ssh_connection);
433 drop(delegate);
434 })
435 }
436
437 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
438 let can_reconnect = self
439 .state
440 .as_ref()
441 .map(|state| state.can_reconnect())
442 .unwrap_or(false);
443 if !can_reconnect {
444 log::info!("aborting reconnect, because not in state that allows reconnecting");
445 let error = if let Some(state) = self.state.as_ref() {
446 format!("invalid state, cannot reconnect while in state {state}")
447 } else {
448 "no state set".to_string()
449 };
450 anyhow::bail!(error);
451 }
452
453 let state = self.state.take().unwrap();
454 let (attempts, remote_connection, delegate) = match state {
455 State::Connected {
456 ssh_connection,
457 delegate,
458 multiplex_task,
459 heartbeat_task,
460 }
461 | State::HeartbeatMissed {
462 ssh_connection,
463 delegate,
464 multiplex_task,
465 heartbeat_task,
466 ..
467 } => {
468 drop(multiplex_task);
469 drop(heartbeat_task);
470 (0, ssh_connection, delegate)
471 }
472 State::ReconnectFailed {
473 attempts,
474 ssh_connection,
475 delegate,
476 ..
477 } => (attempts, ssh_connection, delegate),
478 State::Connecting
479 | State::Reconnecting
480 | State::ReconnectExhausted
481 | State::ServerNotRunning => unreachable!(),
482 };
483
484 let attempts = attempts + 1;
485 if attempts > MAX_RECONNECT_ATTEMPTS {
486 log::error!(
487 "Failed to reconnect to after {} attempts, giving up",
488 MAX_RECONNECT_ATTEMPTS
489 );
490 self.set_state(State::ReconnectExhausted, cx);
491 return Ok(());
492 }
493
494 self.set_state(State::Reconnecting, cx);
495
496 log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
497
498 let unique_identifier = self.unique_identifier.clone();
499 let client = self.client.clone();
500 let reconnect_task = cx.spawn(async move |this, cx| {
501 macro_rules! failed {
502 ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr) => {
503 return State::ReconnectFailed {
504 error: anyhow!($error),
505 attempts: $attempts,
506 ssh_connection: $ssh_connection,
507 delegate: $delegate,
508 };
509 };
510 }
511
512 if let Err(error) = remote_connection
513 .kill()
514 .await
515 .context("Failed to kill ssh process")
516 {
517 failed!(error, attempts, remote_connection, delegate);
518 };
519
520 let connection_options = remote_connection.connection_options();
521
522 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
523 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
524 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
525
526 let (ssh_connection, io_task) = match async {
527 let ssh_connection = cx
528 .update_global(|pool: &mut ConnectionPool, cx| {
529 pool.connect(connection_options, &delegate, cx)
530 })?
531 .await
532 .map_err(|error| error.cloned())?;
533
534 let io_task = ssh_connection.start_proxy(
535 unique_identifier,
536 true,
537 incoming_tx,
538 outgoing_rx,
539 connection_activity_tx,
540 delegate.clone(),
541 cx,
542 );
543 anyhow::Ok((ssh_connection, io_task))
544 }
545 .await
546 {
547 Ok((ssh_connection, io_task)) => (ssh_connection, io_task),
548 Err(error) => {
549 failed!(error, attempts, remote_connection, delegate);
550 }
551 };
552
553 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
554 client.reconnect(incoming_rx, outgoing_tx, cx);
555
556 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
557 failed!(error, attempts, ssh_connection, delegate);
558 };
559
560 State::Connected {
561 ssh_connection,
562 delegate,
563 multiplex_task,
564 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
565 }
566 });
567
568 cx.spawn(async move |this, cx| {
569 let new_state = reconnect_task.await;
570 this.update(cx, |this, cx| {
571 this.try_set_state(cx, |old_state| {
572 if old_state.is_reconnecting() {
573 match &new_state {
574 State::Connecting
575 | State::Reconnecting
576 | State::HeartbeatMissed { .. }
577 | State::ServerNotRunning => {}
578 State::Connected { .. } => {
579 log::info!("Successfully reconnected");
580 }
581 State::ReconnectFailed {
582 error, attempts, ..
583 } => {
584 log::error!(
585 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
586 attempts,
587 error
588 );
589 }
590 State::ReconnectExhausted => {
591 log::error!("Reconnect attempt failed and all attempts exhausted");
592 }
593 }
594 Some(new_state)
595 } else {
596 None
597 }
598 });
599
600 if this.state_is(State::is_reconnect_failed) {
601 this.reconnect(cx)
602 } else if this.state_is(State::is_reconnect_exhausted) {
603 Ok(())
604 } else {
605 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
606 Ok(())
607 }
608 })
609 })
610 .detach_and_log_err(cx);
611
612 Ok(())
613 }
614
615 fn heartbeat(
616 this: WeakEntity<Self>,
617 mut connection_activity_rx: mpsc::Receiver<()>,
618 cx: &mut AsyncApp,
619 ) -> Task<Result<()>> {
620 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
621 return Task::ready(Err(anyhow!("SshRemoteClient lost")));
622 };
623
624 cx.spawn(async move |cx| {
625 let mut missed_heartbeats = 0;
626
627 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
628 futures::pin_mut!(keepalive_timer);
629
630 loop {
631 select_biased! {
632 result = connection_activity_rx.next().fuse() => {
633 if result.is_none() {
634 log::warn!("ssh heartbeat: connection activity channel has been dropped. stopping.");
635 return Ok(());
636 }
637
638 if missed_heartbeats != 0 {
639 missed_heartbeats = 0;
640 let _ =this.update(cx, |this, cx| {
641 this.handle_heartbeat_result(missed_heartbeats, cx)
642 })?;
643 }
644 }
645 _ = keepalive_timer => {
646 log::debug!("Sending heartbeat to server...");
647
648 let result = select_biased! {
649 _ = connection_activity_rx.next().fuse() => {
650 Ok(())
651 }
652 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
653 ping_result
654 }
655 };
656
657 if result.is_err() {
658 missed_heartbeats += 1;
659 log::warn!(
660 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
661 HEARTBEAT_TIMEOUT,
662 missed_heartbeats,
663 MAX_MISSED_HEARTBEATS
664 );
665 } else if missed_heartbeats != 0 {
666 missed_heartbeats = 0;
667 } else {
668 continue;
669 }
670
671 let result = this.update(cx, |this, cx| {
672 this.handle_heartbeat_result(missed_heartbeats, cx)
673 })?;
674 if result.is_break() {
675 return Ok(());
676 }
677 }
678 }
679
680 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
681 }
682 })
683 }
684
685 fn handle_heartbeat_result(
686 &mut self,
687 missed_heartbeats: usize,
688 cx: &mut Context<Self>,
689 ) -> ControlFlow<()> {
690 let state = self.state.take().unwrap();
691 let next_state = if missed_heartbeats > 0 {
692 state.heartbeat_missed()
693 } else {
694 state.heartbeat_recovered()
695 };
696
697 self.set_state(next_state, cx);
698
699 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
700 log::error!(
701 "Missed last {} heartbeats. Reconnecting...",
702 missed_heartbeats
703 );
704
705 self.reconnect(cx)
706 .context("failed to start reconnect process after missing heartbeats")
707 .log_err();
708 ControlFlow::Break(())
709 } else {
710 ControlFlow::Continue(())
711 }
712 }
713
714 fn monitor(
715 this: WeakEntity<Self>,
716 io_task: Task<Result<i32>>,
717 cx: &AsyncApp,
718 ) -> Task<Result<()>> {
719 cx.spawn(async move |cx| {
720 let result = io_task.await;
721
722 match result {
723 Ok(exit_code) => {
724 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
725 match error {
726 ProxyLaunchError::ServerNotRunning => {
727 log::error!("failed to reconnect because server is not running");
728 this.update(cx, |this, cx| {
729 this.set_state(State::ServerNotRunning, cx);
730 })?;
731 }
732 }
733 } else if exit_code > 0 {
734 log::error!("proxy process terminated unexpectedly");
735 this.update(cx, |this, cx| {
736 this.reconnect(cx).ok();
737 })?;
738 }
739 }
740 Err(error) => {
741 log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
742 this.update(cx, |this, cx| {
743 this.reconnect(cx).ok();
744 })?;
745 }
746 }
747
748 Ok(())
749 })
750 }
751
752 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
753 self.state.as_ref().is_some_and(check)
754 }
755
756 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
757 let new_state = self.state.as_ref().and_then(map);
758 if let Some(new_state) = new_state {
759 self.state.replace(new_state);
760 cx.notify();
761 }
762 }
763
764 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
765 log::info!("setting state to '{}'", &state);
766
767 let is_reconnect_exhausted = state.is_reconnect_exhausted();
768 let is_server_not_running = state.is_server_not_running();
769 self.state.replace(state);
770
771 if is_reconnect_exhausted || is_server_not_running {
772 cx.emit(RemoteClientEvent::Disconnected);
773 }
774 cx.notify();
775 }
776
777 pub fn shell(&self) -> Option<String> {
778 Some(self.remote_connection()?.shell())
779 }
780
781 pub fn default_system_shell(&self) -> Option<String> {
782 Some(self.remote_connection()?.default_system_shell())
783 }
784
785 pub fn shares_network_interface(&self) -> bool {
786 self.remote_connection()
787 .map_or(false, |connection| connection.shares_network_interface())
788 }
789
790 pub fn build_command(
791 &self,
792 program: Option<String>,
793 args: &[String],
794 env: &HashMap<String, String>,
795 working_dir: Option<String>,
796 port_forward: Option<(u16, String, u16)>,
797 ) -> Result<CommandTemplate> {
798 let Some(connection) = self.remote_connection() else {
799 return Err(anyhow!("no ssh connection"));
800 };
801 connection.build_command(program, args, env, working_dir, port_forward)
802 }
803
804 pub fn build_forward_port_command(
805 &self,
806 local_port: u16,
807 host: String,
808 remote_port: u16,
809 ) -> Result<CommandTemplate> {
810 let Some(connection) = self.remote_connection() else {
811 return Err(anyhow!("no ssh connection"));
812 };
813 connection.build_forward_port_command(local_port, host, remote_port)
814 }
815
816 pub fn upload_directory(
817 &self,
818 src_path: PathBuf,
819 dest_path: RemotePathBuf,
820 cx: &App,
821 ) -> Task<Result<()>> {
822 let Some(connection) = self.remote_connection() else {
823 return Task::ready(Err(anyhow!("no ssh connection")));
824 };
825 connection.upload_directory(src_path, dest_path, cx)
826 }
827
828 pub fn proto_client(&self) -> AnyProtoClient {
829 self.client.clone().into()
830 }
831
832 pub fn connection_options(&self) -> RemoteConnectionOptions {
833 self.connection_options.clone()
834 }
835
836 pub fn connection_state(&self) -> ConnectionState {
837 self.state
838 .as_ref()
839 .map(ConnectionState::from)
840 .unwrap_or(ConnectionState::Disconnected)
841 }
842
843 pub fn is_disconnected(&self) -> bool {
844 self.connection_state() == ConnectionState::Disconnected
845 }
846
847 pub fn path_style(&self) -> PathStyle {
848 self.path_style
849 }
850
851 #[cfg(any(test, feature = "test-support"))]
852 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
853 let opts = self.connection_options();
854 client_cx.spawn(async move |cx| {
855 let connection = cx
856 .update_global(|c: &mut ConnectionPool, _| {
857 if let Some(ConnectionPoolEntry::Connecting(c)) = c.connections.get(&opts) {
858 c.clone()
859 } else {
860 panic!("missing test connection")
861 }
862 })
863 .unwrap()
864 .await
865 .unwrap();
866
867 connection.simulate_disconnect(cx);
868 })
869 }
870
871 #[cfg(any(test, feature = "test-support"))]
872 pub fn fake_server(
873 client_cx: &mut gpui::TestAppContext,
874 server_cx: &mut gpui::TestAppContext,
875 ) -> (RemoteConnectionOptions, AnyProtoClient) {
876 let port = client_cx
877 .update(|cx| cx.default_global::<ConnectionPool>().connections.len() as u16 + 1);
878 let opts = RemoteConnectionOptions::Ssh(SshConnectionOptions {
879 host: "<fake>".to_string(),
880 port: Some(port),
881 ..Default::default()
882 });
883 let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
884 let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
885 let server_client =
886 server_cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx, "fake-server"));
887 let connection: Arc<dyn RemoteConnection> = Arc::new(fake::FakeRemoteConnection {
888 connection_options: opts.clone(),
889 server_cx: fake::SendableCx::new(server_cx),
890 server_channel: server_client.clone(),
891 });
892
893 client_cx.update(|cx| {
894 cx.update_default_global(|c: &mut ConnectionPool, cx| {
895 c.connections.insert(
896 opts.clone(),
897 ConnectionPoolEntry::Connecting(
898 cx.background_spawn({
899 let connection = connection.clone();
900 async move { Ok(connection.clone()) }
901 })
902 .shared(),
903 ),
904 );
905 })
906 });
907
908 (opts, server_client.into())
909 }
910
911 #[cfg(any(test, feature = "test-support"))]
912 pub async fn fake_client(
913 opts: RemoteConnectionOptions,
914 client_cx: &mut gpui::TestAppContext,
915 ) -> Entity<Self> {
916 let (_tx, rx) = oneshot::channel();
917 client_cx
918 .update(|cx| {
919 Self::new(
920 ConnectionIdentifier::setup(),
921 opts,
922 rx,
923 Arc::new(fake::Delegate),
924 cx,
925 )
926 })
927 .await
928 .unwrap()
929 .unwrap()
930 }
931
932 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
933 self.state
934 .as_ref()
935 .and_then(|state| state.remote_connection())
936 }
937}
938
939enum ConnectionPoolEntry {
940 Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
941 Connected(Weak<dyn RemoteConnection>),
942}
943
944#[derive(Default)]
945struct ConnectionPool {
946 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
947}
948
949impl Global for ConnectionPool {}
950
951impl ConnectionPool {
952 pub fn connect(
953 &mut self,
954 opts: RemoteConnectionOptions,
955 delegate: &Arc<dyn RemoteClientDelegate>,
956 cx: &mut App,
957 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
958 let connection = self.connections.get(&opts);
959 match connection {
960 Some(ConnectionPoolEntry::Connecting(task)) => {
961 let delegate = delegate.clone();
962 cx.spawn(async move |cx| {
963 delegate.set_status(Some("Waiting for existing connection attempt"), cx);
964 })
965 .detach();
966 return task.clone();
967 }
968 Some(ConnectionPoolEntry::Connected(ssh)) => {
969 if let Some(ssh) = ssh.upgrade()
970 && !ssh.has_been_killed()
971 {
972 return Task::ready(Ok(ssh)).shared();
973 }
974 self.connections.remove(&opts);
975 }
976 None => {}
977 }
978
979 let task = cx
980 .spawn({
981 let opts = opts.clone();
982 let delegate = delegate.clone();
983 async move |cx| {
984 let connection = match opts.clone() {
985 RemoteConnectionOptions::Ssh(opts) => {
986 SshRemoteConnection::new(opts, delegate, cx)
987 .await
988 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
989 }
990 RemoteConnectionOptions::Wsl(opts) => {
991 WslRemoteConnection::new(opts, delegate, cx)
992 .await
993 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
994 }
995 };
996
997 cx.update_global(|pool: &mut Self, _| {
998 debug_assert!(matches!(
999 pool.connections.get(&opts),
1000 Some(ConnectionPoolEntry::Connecting(_))
1001 ));
1002 match connection {
1003 Ok(connection) => {
1004 pool.connections.insert(
1005 opts.clone(),
1006 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1007 );
1008 Ok(connection)
1009 }
1010 Err(error) => {
1011 pool.connections.remove(&opts);
1012 Err(Arc::new(error))
1013 }
1014 }
1015 })?
1016 }
1017 })
1018 .shared();
1019
1020 self.connections
1021 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1022 task
1023 }
1024}
1025
1026#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1027pub enum RemoteConnectionOptions {
1028 Ssh(SshConnectionOptions),
1029 Wsl(WslConnectionOptions),
1030}
1031
1032impl RemoteConnectionOptions {
1033 pub fn display_name(&self) -> String {
1034 match self {
1035 RemoteConnectionOptions::Ssh(opts) => opts.host.clone(),
1036 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1037 }
1038 }
1039}
1040
1041impl From<SshConnectionOptions> for RemoteConnectionOptions {
1042 fn from(opts: SshConnectionOptions) -> Self {
1043 RemoteConnectionOptions::Ssh(opts)
1044 }
1045}
1046
1047impl From<WslConnectionOptions> for RemoteConnectionOptions {
1048 fn from(opts: WslConnectionOptions) -> Self {
1049 RemoteConnectionOptions::Wsl(opts)
1050 }
1051}
1052
1053#[async_trait(?Send)]
1054pub(crate) trait RemoteConnection: Send + Sync {
1055 fn start_proxy(
1056 &self,
1057 unique_identifier: String,
1058 reconnect: bool,
1059 incoming_tx: UnboundedSender<Envelope>,
1060 outgoing_rx: UnboundedReceiver<Envelope>,
1061 connection_activity_tx: Sender<()>,
1062 delegate: Arc<dyn RemoteClientDelegate>,
1063 cx: &mut AsyncApp,
1064 ) -> Task<Result<i32>>;
1065 fn upload_directory(
1066 &self,
1067 src_path: PathBuf,
1068 dest_path: RemotePathBuf,
1069 cx: &App,
1070 ) -> Task<Result<()>>;
1071 async fn kill(&self) -> Result<()>;
1072 fn has_been_killed(&self) -> bool;
1073 fn shares_network_interface(&self) -> bool {
1074 false
1075 }
1076 fn build_command(
1077 &self,
1078 program: Option<String>,
1079 args: &[String],
1080 env: &HashMap<String, String>,
1081 working_dir: Option<String>,
1082 port_forward: Option<(u16, String, u16)>,
1083 ) -> Result<CommandTemplate>;
1084 fn build_forward_port_command(
1085 &self,
1086 local_port: u16,
1087 remote: String,
1088 remote_port: u16,
1089 ) -> Result<CommandTemplate>;
1090 fn connection_options(&self) -> RemoteConnectionOptions;
1091 fn path_style(&self) -> PathStyle;
1092 fn shell(&self) -> String;
1093 fn default_system_shell(&self) -> String;
1094
1095 #[cfg(any(test, feature = "test-support"))]
1096 fn simulate_disconnect(&self, _: &AsyncApp) {}
1097}
1098
1099type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1100
1101struct ChannelClient {
1102 next_message_id: AtomicU32,
1103 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1104 buffer: Mutex<VecDeque<Envelope>>,
1105 response_channels: ResponseChannels,
1106 message_handlers: Mutex<ProtoMessageHandlerSet>,
1107 max_received: AtomicU32,
1108 name: &'static str,
1109 task: Mutex<Task<Result<()>>>,
1110}
1111
1112impl ChannelClient {
1113 fn new(
1114 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1115 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1116 cx: &App,
1117 name: &'static str,
1118 ) -> Arc<Self> {
1119 Arc::new_cyclic(|this| Self {
1120 outgoing_tx: Mutex::new(outgoing_tx),
1121 next_message_id: AtomicU32::new(0),
1122 max_received: AtomicU32::new(0),
1123 response_channels: ResponseChannels::default(),
1124 message_handlers: Default::default(),
1125 buffer: Mutex::new(VecDeque::new()),
1126 name,
1127 task: Mutex::new(Self::start_handling_messages(
1128 this.clone(),
1129 incoming_rx,
1130 &cx.to_async(),
1131 )),
1132 })
1133 }
1134
1135 fn start_handling_messages(
1136 this: Weak<Self>,
1137 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1138 cx: &AsyncApp,
1139 ) -> Task<Result<()>> {
1140 cx.spawn(async move |cx| {
1141 let peer_id = PeerId { owner_id: 0, id: 0 };
1142 while let Some(incoming) = incoming_rx.next().await {
1143 let Some(this) = this.upgrade() else {
1144 return anyhow::Ok(());
1145 };
1146 if let Some(ack_id) = incoming.ack_id {
1147 let mut buffer = this.buffer.lock();
1148 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1149 buffer.pop_front();
1150 }
1151 }
1152 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1153 {
1154 log::debug!(
1155 "{}:ssh message received. name:FlushBufferedMessages",
1156 this.name
1157 );
1158 {
1159 let buffer = this.buffer.lock();
1160 for envelope in buffer.iter() {
1161 this.outgoing_tx
1162 .lock()
1163 .unbounded_send(envelope.clone())
1164 .ok();
1165 }
1166 }
1167 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1168 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1169 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1170 continue;
1171 }
1172
1173 this.max_received.store(incoming.id, SeqCst);
1174
1175 if let Some(request_id) = incoming.responding_to {
1176 let request_id = MessageId(request_id);
1177 let sender = this.response_channels.lock().remove(&request_id);
1178 if let Some(sender) = sender {
1179 let (tx, rx) = oneshot::channel();
1180 if incoming.payload.is_some() {
1181 sender.send((incoming, tx)).ok();
1182 }
1183 rx.await.ok();
1184 }
1185 } else if let Some(envelope) =
1186 build_typed_envelope(peer_id, Instant::now(), incoming)
1187 {
1188 let type_name = envelope.payload_type_name();
1189 let message_id = envelope.message_id();
1190 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1191 &this.message_handlers,
1192 envelope,
1193 this.clone().into(),
1194 cx.clone(),
1195 ) {
1196 log::debug!("{}:ssh message received. name:{type_name}", this.name);
1197 cx.foreground_executor()
1198 .spawn(async move {
1199 match future.await {
1200 Ok(_) => {
1201 log::debug!(
1202 "{}:ssh message handled. name:{type_name}",
1203 this.name
1204 );
1205 }
1206 Err(error) => {
1207 log::error!(
1208 "{}:error handling message. type:{}, error:{:#}",
1209 this.name,
1210 type_name,
1211 format!("{error:#}").lines().fold(
1212 String::new(),
1213 |mut message, line| {
1214 if !message.is_empty() {
1215 message.push(' ');
1216 }
1217 message.push_str(line);
1218 message
1219 }
1220 )
1221 );
1222 }
1223 }
1224 })
1225 .detach()
1226 } else {
1227 log::error!("{}:unhandled ssh message name:{type_name}", this.name);
1228 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1229 message_id,
1230 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1231 ) {
1232 log::error!(
1233 "{}:error sending error response for {type_name}:{e:#}",
1234 this.name
1235 );
1236 }
1237 }
1238 }
1239 }
1240 anyhow::Ok(())
1241 })
1242 }
1243
1244 fn reconnect(
1245 self: &Arc<Self>,
1246 incoming_rx: UnboundedReceiver<Envelope>,
1247 outgoing_tx: UnboundedSender<Envelope>,
1248 cx: &AsyncApp,
1249 ) {
1250 *self.outgoing_tx.lock() = outgoing_tx;
1251 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1252 }
1253
1254 fn request<T: RequestMessage>(
1255 &self,
1256 payload: T,
1257 ) -> impl 'static + Future<Output = Result<T::Response>> {
1258 self.request_internal(payload, true)
1259 }
1260
1261 fn request_internal<T: RequestMessage>(
1262 &self,
1263 payload: T,
1264 use_buffer: bool,
1265 ) -> impl 'static + Future<Output = Result<T::Response>> {
1266 log::debug!("ssh request start. name:{}", T::NAME);
1267 let response =
1268 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1269 async move {
1270 let response = response.await?;
1271 log::debug!("ssh request finish. name:{}", T::NAME);
1272 T::Response::from_envelope(response).context("received a response of the wrong type")
1273 }
1274 }
1275
1276 async fn resync(&self, timeout: Duration) -> Result<()> {
1277 smol::future::or(
1278 async {
1279 self.request_internal(proto::FlushBufferedMessages {}, false)
1280 .await?;
1281
1282 for envelope in self.buffer.lock().iter() {
1283 self.outgoing_tx
1284 .lock()
1285 .unbounded_send(envelope.clone())
1286 .ok();
1287 }
1288 Ok(())
1289 },
1290 async {
1291 smol::Timer::after(timeout).await;
1292 anyhow::bail!("Timed out resyncing remote client")
1293 },
1294 )
1295 .await
1296 }
1297
1298 async fn ping(&self, timeout: Duration) -> Result<()> {
1299 smol::future::or(
1300 async {
1301 self.request(proto::Ping {}).await?;
1302 Ok(())
1303 },
1304 async {
1305 smol::Timer::after(timeout).await;
1306 anyhow::bail!("Timed out pinging remote client")
1307 },
1308 )
1309 .await
1310 }
1311
1312 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1313 log::debug!("ssh send name:{}", T::NAME);
1314 self.send_dynamic(payload.into_envelope(0, None, None))
1315 }
1316
1317 fn request_dynamic(
1318 &self,
1319 mut envelope: proto::Envelope,
1320 type_name: &'static str,
1321 use_buffer: bool,
1322 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1323 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1324 let (tx, rx) = oneshot::channel();
1325 let mut response_channels_lock = self.response_channels.lock();
1326 response_channels_lock.insert(MessageId(envelope.id), tx);
1327 drop(response_channels_lock);
1328
1329 let result = if use_buffer {
1330 self.send_buffered(envelope)
1331 } else {
1332 self.send_unbuffered(envelope)
1333 };
1334 async move {
1335 if let Err(error) = &result {
1336 log::error!("failed to send message: {error}");
1337 anyhow::bail!("failed to send message: {error}");
1338 }
1339
1340 let response = rx.await.context("connection lost")?.0;
1341 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1342 return Err(RpcError::from_proto(error, type_name));
1343 }
1344 Ok(response)
1345 }
1346 }
1347
1348 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1349 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1350 self.send_buffered(envelope)
1351 }
1352
1353 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1354 envelope.ack_id = Some(self.max_received.load(SeqCst));
1355 self.buffer.lock().push_back(envelope.clone());
1356 // ignore errors on send (happen while we're reconnecting)
1357 // assume that the global "disconnected" overlay is sufficient.
1358 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1359 Ok(())
1360 }
1361
1362 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1363 envelope.ack_id = Some(self.max_received.load(SeqCst));
1364 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1365 Ok(())
1366 }
1367}
1368
1369impl ProtoClient for ChannelClient {
1370 fn request(
1371 &self,
1372 envelope: proto::Envelope,
1373 request_type: &'static str,
1374 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1375 self.request_dynamic(envelope, request_type, true).boxed()
1376 }
1377
1378 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1379 self.send_dynamic(envelope)
1380 }
1381
1382 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1383 self.send_dynamic(envelope)
1384 }
1385
1386 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1387 &self.message_handlers
1388 }
1389
1390 fn is_via_collab(&self) -> bool {
1391 false
1392 }
1393}
1394
1395#[cfg(any(test, feature = "test-support"))]
1396mod fake {
1397 use super::{ChannelClient, RemoteClientDelegate, RemoteConnection, RemotePlatform};
1398 use crate::remote_client::{CommandTemplate, RemoteConnectionOptions};
1399 use anyhow::Result;
1400 use askpass::EncryptedPassword;
1401 use async_trait::async_trait;
1402 use collections::HashMap;
1403 use futures::{
1404 FutureExt, SinkExt, StreamExt,
1405 channel::{
1406 mpsc::{self, Sender},
1407 oneshot,
1408 },
1409 select_biased,
1410 };
1411 use gpui::{App, AppContext as _, AsyncApp, SemanticVersion, Task, TestAppContext};
1412 use release_channel::ReleaseChannel;
1413 use rpc::proto::Envelope;
1414 use std::{path::PathBuf, sync::Arc};
1415 use util::paths::{PathStyle, RemotePathBuf};
1416
1417 pub(super) struct FakeRemoteConnection {
1418 pub(super) connection_options: RemoteConnectionOptions,
1419 pub(super) server_channel: Arc<ChannelClient>,
1420 pub(super) server_cx: SendableCx,
1421 }
1422
1423 pub(super) struct SendableCx(AsyncApp);
1424 impl SendableCx {
1425 // SAFETY: When run in test mode, GPUI is always single threaded.
1426 pub(super) fn new(cx: &TestAppContext) -> Self {
1427 Self(cx.to_async())
1428 }
1429
1430 // SAFETY: Enforce that we're on the main thread by requiring a valid AsyncApp
1431 fn get(&self, _: &AsyncApp) -> AsyncApp {
1432 self.0.clone()
1433 }
1434 }
1435
1436 // SAFETY: There is no way to access a SendableCx from a different thread, see [`SendableCx::new`] and [`SendableCx::get`]
1437 unsafe impl Send for SendableCx {}
1438 unsafe impl Sync for SendableCx {}
1439
1440 #[async_trait(?Send)]
1441 impl RemoteConnection for FakeRemoteConnection {
1442 async fn kill(&self) -> Result<()> {
1443 Ok(())
1444 }
1445
1446 fn has_been_killed(&self) -> bool {
1447 false
1448 }
1449
1450 fn build_command(
1451 &self,
1452 program: Option<String>,
1453 args: &[String],
1454 env: &HashMap<String, String>,
1455 _: Option<String>,
1456 _: Option<(u16, String, u16)>,
1457 ) -> Result<CommandTemplate> {
1458 let ssh_program = program.unwrap_or_else(|| "sh".to_string());
1459 let mut ssh_args = Vec::new();
1460 ssh_args.push(ssh_program);
1461 ssh_args.extend(args.iter().cloned());
1462 Ok(CommandTemplate {
1463 program: "ssh".into(),
1464 args: ssh_args,
1465 env: env.clone(),
1466 })
1467 }
1468
1469 fn build_forward_port_command(
1470 &self,
1471 local_port: u16,
1472 host: String,
1473 remote_port: u16,
1474 ) -> anyhow::Result<CommandTemplate> {
1475 Ok(CommandTemplate {
1476 program: "ssh".into(),
1477 args: vec![
1478 "-N".into(),
1479 "-L".into(),
1480 format!("{local_port}:{host}:{remote_port}"),
1481 ],
1482 env: Default::default(),
1483 })
1484 }
1485
1486 fn upload_directory(
1487 &self,
1488 _src_path: PathBuf,
1489 _dest_path: RemotePathBuf,
1490 _cx: &App,
1491 ) -> Task<Result<()>> {
1492 unreachable!()
1493 }
1494
1495 fn connection_options(&self) -> RemoteConnectionOptions {
1496 self.connection_options.clone()
1497 }
1498
1499 fn simulate_disconnect(&self, cx: &AsyncApp) {
1500 let (outgoing_tx, _) = mpsc::unbounded::<Envelope>();
1501 let (_, incoming_rx) = mpsc::unbounded::<Envelope>();
1502 self.server_channel
1503 .reconnect(incoming_rx, outgoing_tx, &self.server_cx.get(cx));
1504 }
1505
1506 fn start_proxy(
1507 &self,
1508 _unique_identifier: String,
1509 _reconnect: bool,
1510 mut client_incoming_tx: mpsc::UnboundedSender<Envelope>,
1511 mut client_outgoing_rx: mpsc::UnboundedReceiver<Envelope>,
1512 mut connection_activity_tx: Sender<()>,
1513 _delegate: Arc<dyn RemoteClientDelegate>,
1514 cx: &mut AsyncApp,
1515 ) -> Task<Result<i32>> {
1516 let (mut server_incoming_tx, server_incoming_rx) = mpsc::unbounded::<Envelope>();
1517 let (server_outgoing_tx, mut server_outgoing_rx) = mpsc::unbounded::<Envelope>();
1518
1519 self.server_channel.reconnect(
1520 server_incoming_rx,
1521 server_outgoing_tx,
1522 &self.server_cx.get(cx),
1523 );
1524
1525 cx.background_spawn(async move {
1526 loop {
1527 select_biased! {
1528 server_to_client = server_outgoing_rx.next().fuse() => {
1529 let Some(server_to_client) = server_to_client else {
1530 return Ok(1)
1531 };
1532 connection_activity_tx.try_send(()).ok();
1533 client_incoming_tx.send(server_to_client).await.ok();
1534 }
1535 client_to_server = client_outgoing_rx.next().fuse() => {
1536 let Some(client_to_server) = client_to_server else {
1537 return Ok(1)
1538 };
1539 server_incoming_tx.send(client_to_server).await.ok();
1540 }
1541 }
1542 }
1543 })
1544 }
1545
1546 fn path_style(&self) -> PathStyle {
1547 PathStyle::local()
1548 }
1549
1550 fn shell(&self) -> String {
1551 "sh".to_owned()
1552 }
1553
1554 fn default_system_shell(&self) -> String {
1555 "sh".to_owned()
1556 }
1557 }
1558
1559 pub(super) struct Delegate;
1560
1561 impl RemoteClientDelegate for Delegate {
1562 fn ask_password(&self, _: String, _: oneshot::Sender<EncryptedPassword>, _: &mut AsyncApp) {
1563 unreachable!()
1564 }
1565
1566 fn download_server_binary_locally(
1567 &self,
1568 _: RemotePlatform,
1569 _: ReleaseChannel,
1570 _: Option<SemanticVersion>,
1571 _: &mut AsyncApp,
1572 ) -> Task<Result<PathBuf>> {
1573 unreachable!()
1574 }
1575
1576 fn get_download_params(
1577 &self,
1578 _platform: RemotePlatform,
1579 _release_channel: ReleaseChannel,
1580 _version: Option<SemanticVersion>,
1581 _cx: &mut AsyncApp,
1582 ) -> Task<Result<Option<(String, String)>>> {
1583 unreachable!()
1584 }
1585
1586 fn set_status(&self, _: Option<&str>, _: &mut AsyncApp) {}
1587 }
1588}