1#[cfg(any(test, feature = "test-support"))]
2use crate::transport::mock::ConnectGuard;
3use crate::{
4 SshConnectionOptions,
5 protocol::MessageId,
6 proxy::ProxyLaunchError,
7 transport::{
8 docker::{DockerConnectionOptions, DockerExecConnection},
9 ssh::SshRemoteConnection,
10 wsl::{WslConnectionOptions, WslRemoteConnection},
11 },
12};
13use anyhow::{Context as _, Result, anyhow};
14use askpass::EncryptedPassword;
15use async_trait::async_trait;
16use collections::HashMap;
17use futures::{
18 Future, FutureExt as _, StreamExt as _,
19 channel::{
20 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
21 oneshot,
22 },
23 future::{BoxFuture, Shared},
24 select, select_biased,
25};
26use gpui::{
27 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
28 EventEmitter, FutureExt, Global, Task, WeakEntity,
29};
30use parking_lot::Mutex;
31
32use release_channel::ReleaseChannel;
33use rpc::{
34 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
35 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
36};
37use semver::Version;
38use std::{
39 collections::VecDeque,
40 fmt,
41 ops::ControlFlow,
42 path::PathBuf,
43 sync::{
44 Arc, Weak,
45 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
46 },
47 time::{Duration, Instant},
48};
49use util::{
50 ResultExt,
51 paths::{PathStyle, RemotePathBuf},
52};
53
54#[derive(Copy, Clone, Debug, PartialEq, Eq)]
55pub enum RemoteOs {
56 Linux,
57 MacOs,
58 Windows,
59}
60
61impl RemoteOs {
62 pub fn as_str(&self) -> &'static str {
63 match self {
64 RemoteOs::Linux => "linux",
65 RemoteOs::MacOs => "macos",
66 RemoteOs::Windows => "windows",
67 }
68 }
69
70 pub fn is_windows(&self) -> bool {
71 matches!(self, RemoteOs::Windows)
72 }
73}
74
75impl std::fmt::Display for RemoteOs {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.write_str(self.as_str())
78 }
79}
80
81#[derive(Copy, Clone, Debug, PartialEq, Eq)]
82pub enum RemoteArch {
83 X86_64,
84 Aarch64,
85}
86
87impl RemoteArch {
88 pub fn as_str(&self) -> &'static str {
89 match self {
90 RemoteArch::X86_64 => "x86_64",
91 RemoteArch::Aarch64 => "aarch64",
92 }
93 }
94}
95
96impl std::fmt::Display for RemoteArch {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.write_str(self.as_str())
99 }
100}
101
102#[derive(Copy, Clone, Debug)]
103pub struct RemotePlatform {
104 pub os: RemoteOs,
105 pub arch: RemoteArch,
106}
107
108#[derive(Clone, Debug)]
109pub struct CommandTemplate {
110 pub program: String,
111 pub args: Vec<String>,
112 pub env: HashMap<String, String>,
113}
114
115pub trait RemoteClientDelegate: Send + Sync {
116 fn ask_password(
117 &self,
118 prompt: String,
119 tx: oneshot::Sender<EncryptedPassword>,
120 cx: &mut AsyncApp,
121 );
122 fn get_download_url(
123 &self,
124 platform: RemotePlatform,
125 release_channel: ReleaseChannel,
126 version: Option<Version>,
127 cx: &mut AsyncApp,
128 ) -> Task<Result<Option<String>>>;
129 fn download_server_binary_locally(
130 &self,
131 platform: RemotePlatform,
132 release_channel: ReleaseChannel,
133 version: Option<Version>,
134 cx: &mut AsyncApp,
135 ) -> Task<Result<PathBuf>>;
136 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
137}
138
139const MAX_MISSED_HEARTBEATS: usize = 5;
140const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
141const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
142const INITIAL_CONNECTION_TIMEOUT: Duration =
143 Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
144
145pub const MAX_RECONNECT_ATTEMPTS: usize = 3;
146
147enum State {
148 Connecting,
149 Connected {
150 remote_connection: Arc<dyn RemoteConnection>,
151 delegate: Arc<dyn RemoteClientDelegate>,
152
153 multiplex_task: Task<Result<()>>,
154 heartbeat_task: Task<Result<()>>,
155 },
156 HeartbeatMissed {
157 missed_heartbeats: usize,
158
159 remote_connection: Arc<dyn RemoteConnection>,
160 delegate: Arc<dyn RemoteClientDelegate>,
161
162 multiplex_task: Task<Result<()>>,
163 heartbeat_task: Task<Result<()>>,
164 },
165 Reconnecting,
166 ReconnectFailed {
167 remote_connection: Arc<dyn RemoteConnection>,
168 delegate: Arc<dyn RemoteClientDelegate>,
169
170 error: anyhow::Error,
171 attempts: usize,
172 },
173 ReconnectExhausted,
174 ServerNotRunning,
175}
176
177impl fmt::Display for State {
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 match self {
180 Self::Connecting => write!(f, "connecting"),
181 Self::Connected { .. } => write!(f, "connected"),
182 Self::Reconnecting => write!(f, "reconnecting"),
183 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
184 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
185 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
186 Self::ServerNotRunning { .. } => write!(f, "server not running"),
187 }
188 }
189}
190
191impl State {
192 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
193 match self {
194 Self::Connected {
195 remote_connection, ..
196 } => Some(remote_connection.clone()),
197 Self::HeartbeatMissed {
198 remote_connection, ..
199 } => Some(remote_connection.clone()),
200 Self::ReconnectFailed {
201 remote_connection, ..
202 } => Some(remote_connection.clone()),
203 _ => None,
204 }
205 }
206
207 fn can_reconnect(&self) -> bool {
208 match self {
209 Self::Connected { .. }
210 | Self::HeartbeatMissed { .. }
211 | Self::ReconnectFailed { .. } => true,
212 State::Connecting
213 | State::Reconnecting
214 | State::ReconnectExhausted
215 | State::ServerNotRunning => false,
216 }
217 }
218
219 fn is_reconnect_failed(&self) -> bool {
220 matches!(self, Self::ReconnectFailed { .. })
221 }
222
223 fn is_reconnect_exhausted(&self) -> bool {
224 matches!(self, Self::ReconnectExhausted { .. })
225 }
226
227 fn is_server_not_running(&self) -> bool {
228 matches!(self, Self::ServerNotRunning)
229 }
230
231 fn is_reconnecting(&self) -> bool {
232 matches!(self, Self::Reconnecting { .. })
233 }
234
235 fn heartbeat_recovered(self) -> Self {
236 match self {
237 Self::HeartbeatMissed {
238 remote_connection,
239 delegate,
240 multiplex_task,
241 heartbeat_task,
242 ..
243 } => Self::Connected {
244 remote_connection,
245 delegate,
246 multiplex_task,
247 heartbeat_task,
248 },
249 _ => self,
250 }
251 }
252
253 fn heartbeat_missed(self) -> Self {
254 match self {
255 Self::Connected {
256 remote_connection,
257 delegate,
258 multiplex_task,
259 heartbeat_task,
260 } => Self::HeartbeatMissed {
261 missed_heartbeats: 1,
262 remote_connection,
263 delegate,
264 multiplex_task,
265 heartbeat_task,
266 },
267 Self::HeartbeatMissed {
268 missed_heartbeats,
269 remote_connection,
270 delegate,
271 multiplex_task,
272 heartbeat_task,
273 } => Self::HeartbeatMissed {
274 missed_heartbeats: missed_heartbeats + 1,
275 remote_connection,
276 delegate,
277 multiplex_task,
278 heartbeat_task,
279 },
280 _ => self,
281 }
282 }
283}
284
285/// The state of the ssh connection.
286#[derive(Clone, Copy, Debug, PartialEq, Eq)]
287pub enum ConnectionState {
288 Connecting,
289 Connected,
290 HeartbeatMissed,
291 Reconnecting,
292 Disconnected,
293}
294
295impl From<&State> for ConnectionState {
296 fn from(value: &State) -> Self {
297 match value {
298 State::Connecting => Self::Connecting,
299 State::Connected { .. } => Self::Connected,
300 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
301 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
302 State::ReconnectExhausted => Self::Disconnected,
303 State::ServerNotRunning => Self::Disconnected,
304 }
305 }
306}
307
308pub struct RemoteClient {
309 client: Arc<ChannelClient>,
310 unique_identifier: String,
311 connection_options: RemoteConnectionOptions,
312 path_style: PathStyle,
313 state: Option<State>,
314}
315
316#[derive(Debug)]
317pub enum RemoteClientEvent {
318 Disconnected,
319}
320
321impl EventEmitter<RemoteClientEvent> for RemoteClient {}
322
323/// Identifies the socket on the remote server so that reconnects
324/// can re-join the same project.
325pub enum ConnectionIdentifier {
326 Setup(u64),
327 Workspace(i64),
328}
329
330static NEXT_ID: AtomicU64 = AtomicU64::new(1);
331
332impl ConnectionIdentifier {
333 pub fn setup() -> Self {
334 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
335 }
336
337 // This string gets used in a socket name, and so must be relatively short.
338 // The total length of:
339 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
340 // Must be less than about 100 characters
341 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
342 // So our strings should be at most 20 characters or so.
343 fn to_string(&self, cx: &App) -> String {
344 let identifier_prefix = match ReleaseChannel::global(cx) {
345 ReleaseChannel::Stable => "".to_string(),
346 release_channel => format!("{}-", release_channel.dev_name()),
347 };
348 match self {
349 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
350 Self::Workspace(workspace_id) => {
351 format!("{identifier_prefix}workspace-{workspace_id}",)
352 }
353 }
354 }
355}
356
357pub async fn connect(
358 connection_options: RemoteConnectionOptions,
359 delegate: Arc<dyn RemoteClientDelegate>,
360 cx: &mut AsyncApp,
361) -> Result<Arc<dyn RemoteConnection>> {
362 cx.update(|cx| {
363 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
364 pool.connect(connection_options.clone(), delegate.clone(), cx)
365 })
366 })
367 .await
368 .map_err(|e| e.cloned())
369}
370
371impl RemoteClient {
372 pub fn new(
373 unique_identifier: ConnectionIdentifier,
374 remote_connection: Arc<dyn RemoteConnection>,
375 cancellation: oneshot::Receiver<()>,
376 delegate: Arc<dyn RemoteClientDelegate>,
377 cx: &mut App,
378 ) -> Task<Result<Option<Entity<Self>>>> {
379 let unique_identifier = unique_identifier.to_string(cx);
380 cx.spawn(async move |cx| {
381 let success = Box::pin(async move {
382 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
383 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
384 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
385
386 let client = cx.update(|cx| {
387 ChannelClient::new(
388 incoming_rx,
389 outgoing_tx,
390 cx,
391 "client",
392 remote_connection.has_wsl_interop(),
393 )
394 });
395
396 let path_style = remote_connection.path_style();
397 let this = cx.new(|_| Self {
398 client: client.clone(),
399 unique_identifier: unique_identifier.clone(),
400 connection_options: remote_connection.connection_options(),
401 path_style,
402 state: Some(State::Connecting),
403 });
404
405 let io_task = remote_connection.start_proxy(
406 unique_identifier,
407 false,
408 incoming_tx,
409 outgoing_rx,
410 connection_activity_tx,
411 delegate.clone(),
412 cx,
413 );
414
415 let ready = client
416 .wait_for_remote_started()
417 .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
418 .await;
419 match ready {
420 Ok(Some(_)) => {}
421 Ok(None) => {
422 let mut error = "remote client exited before becoming ready".to_owned();
423 if let Some(status) = io_task.now_or_never() {
424 match status {
425 Ok(exit_code) => {
426 error.push_str(&format!(", exit_code={exit_code:?}"))
427 }
428 Err(e) => error.push_str(&format!(", error={e:?}")),
429 }
430 }
431 let error = anyhow::anyhow!("{error}");
432 log::error!("failed to establish connection: {}", error);
433 return Err(error);
434 }
435 Err(_) => {
436 let mut error =
437 "remote client did not become ready within the timeout".to_owned();
438 if let Some(status) = io_task.now_or_never() {
439 match status {
440 Ok(exit_code) => {
441 error.push_str(&format!(", exit_code={exit_code:?}"))
442 }
443 Err(e) => error.push_str(&format!(", error={e:?}")),
444 }
445 }
446 let error = anyhow::anyhow!("{error}");
447 log::error!("failed to establish connection: {}", error);
448 return Err(error);
449 }
450 }
451 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
452 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
453 log::error!("failed to establish connection: {}", error);
454 return Err(error);
455 }
456
457 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
458
459 this.update(cx, |this, _| {
460 this.state = Some(State::Connected {
461 remote_connection,
462 delegate,
463 multiplex_task,
464 heartbeat_task,
465 });
466 });
467
468 Ok(Some(this))
469 });
470
471 select! {
472 _ = cancellation.fuse() => {
473 Ok(None)
474 }
475 result = success.fuse() => result
476 }
477 })
478 }
479
480 pub fn proto_client_from_channels(
481 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
482 outgoing_tx: mpsc::UnboundedSender<Envelope>,
483 cx: &App,
484 name: &'static str,
485 has_wsl_interop: bool,
486 ) -> AnyProtoClient {
487 ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
488 }
489
490 pub fn shutdown_processes<T: RequestMessage>(
491 &mut self,
492 shutdown_request: Option<T>,
493 executor: BackgroundExecutor,
494 ) -> Option<impl Future<Output = ()> + use<T>> {
495 let state = self.state.take()?;
496 log::info!("shutting down remote processes");
497
498 let State::Connected {
499 multiplex_task,
500 heartbeat_task,
501 remote_connection,
502 delegate,
503 } = state
504 else {
505 return None;
506 };
507
508 let client = self.client.clone();
509
510 Some(async move {
511 if let Some(shutdown_request) = shutdown_request {
512 client.send(shutdown_request).log_err();
513 // We wait 50ms instead of waiting for a response, because
514 // waiting for a response would require us to wait on the main thread
515 // which we want to avoid in an `on_app_quit` callback.
516 executor.timer(Duration::from_millis(50)).await;
517 }
518
519 // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
520 // child of master_process.
521 drop(multiplex_task);
522 // Now drop the rest of state, which kills master process.
523 drop(heartbeat_task);
524 drop(remote_connection);
525 drop(delegate);
526 })
527 }
528
529 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
530 let can_reconnect = self
531 .state
532 .as_ref()
533 .map(|state| state.can_reconnect())
534 .unwrap_or(false);
535 if !can_reconnect {
536 let state = if let Some(state) = self.state.as_ref() {
537 state.to_string()
538 } else {
539 "no state set".to_string()
540 };
541 log::info!(
542 "aborting reconnect, because not in state that allows reconnecting: {state}"
543 );
544 anyhow::bail!(
545 "aborting reconnect, because not in state that allows reconnecting: {state}"
546 );
547 }
548
549 let state = self.state.take().unwrap();
550 let (attempts, remote_connection, delegate) = match state {
551 State::Connected {
552 remote_connection,
553 delegate,
554 multiplex_task,
555 heartbeat_task,
556 }
557 | State::HeartbeatMissed {
558 remote_connection,
559 delegate,
560 multiplex_task,
561 heartbeat_task,
562 ..
563 } => {
564 drop(multiplex_task);
565 drop(heartbeat_task);
566 (0, remote_connection, delegate)
567 }
568 State::ReconnectFailed {
569 attempts,
570 remote_connection,
571 delegate,
572 ..
573 } => (attempts, remote_connection, delegate),
574 State::Connecting
575 | State::Reconnecting
576 | State::ReconnectExhausted
577 | State::ServerNotRunning => unreachable!(),
578 };
579
580 let attempts = attempts + 1;
581 if attempts > MAX_RECONNECT_ATTEMPTS {
582 log::error!(
583 "Failed to reconnect to after {} attempts, giving up",
584 MAX_RECONNECT_ATTEMPTS
585 );
586 self.set_state(State::ReconnectExhausted, cx);
587 return Ok(());
588 }
589
590 self.set_state(State::Reconnecting, cx);
591
592 log::info!(
593 "Trying to reconnect to remote server... Attempt {}",
594 attempts
595 );
596
597 let unique_identifier = self.unique_identifier.clone();
598 let client = self.client.clone();
599 let reconnect_task = cx.spawn(async move |this, cx| {
600 macro_rules! failed {
601 ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
602 delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
603 return State::ReconnectFailed {
604 error: anyhow!($error),
605 attempts: $attempts,
606 remote_connection: $remote_connection,
607 delegate: $delegate,
608 };
609 };
610 }
611
612 if let Err(error) = remote_connection
613 .kill()
614 .await
615 .context("Failed to kill remote_connection process")
616 {
617 failed!(error, attempts, remote_connection, delegate);
618 };
619
620 let connection_options = remote_connection.connection_options();
621
622 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
623 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
624 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
625
626 let (remote_connection, io_task) = match async {
627 let remote_connection = cx
628 .update_global(|pool: &mut ConnectionPool, cx| {
629 pool.connect(connection_options, delegate.clone(), cx)
630 })
631 .await
632 .map_err(|error| error.cloned())?;
633
634 let io_task = remote_connection.start_proxy(
635 unique_identifier,
636 true,
637 incoming_tx,
638 outgoing_rx,
639 connection_activity_tx,
640 delegate.clone(),
641 cx,
642 );
643 anyhow::Ok((remote_connection, io_task))
644 }
645 .await
646 {
647 Ok((remote_connection, io_task)) => (remote_connection, io_task),
648 Err(error) => {
649 failed!(error, attempts, remote_connection, delegate);
650 }
651 };
652
653 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
654 client.reconnect(incoming_rx, outgoing_tx, cx);
655
656 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
657 failed!(error, attempts, remote_connection, delegate);
658 };
659
660 State::Connected {
661 remote_connection,
662 delegate,
663 multiplex_task,
664 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
665 }
666 });
667
668 cx.spawn(async move |this, cx| {
669 let new_state = reconnect_task.await;
670 this.update(cx, |this, cx| {
671 this.try_set_state(cx, |old_state| {
672 if old_state.is_reconnecting() {
673 match &new_state {
674 State::Connecting
675 | State::Reconnecting
676 | State::HeartbeatMissed { .. }
677 | State::ServerNotRunning => {}
678 State::Connected { .. } => {
679 log::info!("Successfully reconnected");
680 }
681 State::ReconnectFailed {
682 error, attempts, ..
683 } => {
684 log::error!(
685 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
686 attempts,
687 error
688 );
689 }
690 State::ReconnectExhausted => {
691 log::error!("Reconnect attempt failed and all attempts exhausted");
692 }
693 }
694 Some(new_state)
695 } else {
696 None
697 }
698 });
699
700 if this.state_is(State::is_reconnect_failed) {
701 this.reconnect(cx)
702 } else if this.state_is(State::is_reconnect_exhausted) {
703 Ok(())
704 } else {
705 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
706 Ok(())
707 }
708 })
709 })
710 .detach_and_log_err(cx);
711
712 Ok(())
713 }
714
715 fn heartbeat(
716 this: WeakEntity<Self>,
717 mut connection_activity_rx: mpsc::Receiver<()>,
718 cx: &mut AsyncApp,
719 ) -> Task<Result<()>> {
720 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
721 return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
722 };
723
724 cx.spawn(async move |cx| {
725 let mut missed_heartbeats = 0;
726
727 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
728 futures::pin_mut!(keepalive_timer);
729
730 loop {
731 select_biased! {
732 result = connection_activity_rx.next().fuse() => {
733 if result.is_none() {
734 log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
735 return Ok(());
736 }
737
738 if missed_heartbeats != 0 {
739 missed_heartbeats = 0;
740 let _ =this.update(cx, |this, cx| {
741 this.handle_heartbeat_result(missed_heartbeats, cx)
742 })?;
743 }
744 }
745 _ = keepalive_timer => {
746 log::debug!("Sending heartbeat to server...");
747
748 let result = select_biased! {
749 _ = connection_activity_rx.next().fuse() => {
750 Ok(())
751 }
752 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
753 ping_result
754 }
755 };
756
757 if result.is_err() {
758 missed_heartbeats += 1;
759 log::warn!(
760 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
761 HEARTBEAT_TIMEOUT,
762 missed_heartbeats,
763 MAX_MISSED_HEARTBEATS
764 );
765 } else if missed_heartbeats != 0 {
766 missed_heartbeats = 0;
767 } else {
768 continue;
769 }
770
771 let result = this.update(cx, |this, cx| {
772 this.handle_heartbeat_result(missed_heartbeats, cx)
773 })?;
774 if result.is_break() {
775 return Ok(());
776 }
777 }
778 }
779
780 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
781 }
782 })
783 }
784
785 fn handle_heartbeat_result(
786 &mut self,
787 missed_heartbeats: usize,
788 cx: &mut Context<Self>,
789 ) -> ControlFlow<()> {
790 let state = self.state.take().unwrap();
791 let next_state = if missed_heartbeats > 0 {
792 state.heartbeat_missed()
793 } else {
794 state.heartbeat_recovered()
795 };
796
797 self.set_state(next_state, cx);
798
799 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
800 log::error!(
801 "Missed last {} heartbeats. Reconnecting...",
802 missed_heartbeats
803 );
804
805 self.reconnect(cx)
806 .context("failed to start reconnect process after missing heartbeats")
807 .log_err();
808 ControlFlow::Break(())
809 } else {
810 ControlFlow::Continue(())
811 }
812 }
813
814 fn monitor(
815 this: WeakEntity<Self>,
816 io_task: Task<Result<i32>>,
817 cx: &AsyncApp,
818 ) -> Task<Result<()>> {
819 cx.spawn(async move |cx| {
820 let result = io_task.await;
821
822 match result {
823 Ok(exit_code) => {
824 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
825 match error {
826 ProxyLaunchError::ServerNotRunning => {
827 log::error!("failed to reconnect because server is not running");
828 this.update(cx, |this, cx| {
829 this.set_state(State::ServerNotRunning, cx);
830 })?;
831 }
832 }
833 } else if exit_code > 0 {
834 log::error!("proxy process terminated unexpectedly");
835 this.update(cx, |this, cx| {
836 this.reconnect(cx).ok();
837 })?;
838 }
839 }
840 Err(error) => {
841 log::warn!(
842 "remote io task died with error: {:?}. reconnecting...",
843 error
844 );
845 this.update(cx, |this, cx| {
846 this.reconnect(cx).ok();
847 })?;
848 }
849 }
850
851 Ok(())
852 })
853 }
854
855 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
856 self.state.as_ref().is_some_and(check)
857 }
858
859 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
860 let new_state = self.state.as_ref().and_then(map);
861 if let Some(new_state) = new_state {
862 self.state.replace(new_state);
863 cx.notify();
864 }
865 }
866
867 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
868 log::info!("setting state to '{}'", &state);
869
870 let is_reconnect_exhausted = state.is_reconnect_exhausted();
871 let is_server_not_running = state.is_server_not_running();
872 self.state.replace(state);
873
874 if is_reconnect_exhausted || is_server_not_running {
875 cx.emit(RemoteClientEvent::Disconnected);
876 }
877 cx.notify();
878 }
879
880 pub fn shell(&self) -> Option<String> {
881 Some(self.remote_connection()?.shell())
882 }
883
884 pub fn default_system_shell(&self) -> Option<String> {
885 Some(self.remote_connection()?.default_system_shell())
886 }
887
888 pub fn shares_network_interface(&self) -> bool {
889 self.remote_connection()
890 .map_or(false, |connection| connection.shares_network_interface())
891 }
892
893 pub fn build_command(
894 &self,
895 program: Option<String>,
896 args: &[String],
897 env: &HashMap<String, String>,
898 working_dir: Option<String>,
899 port_forward: Option<(u16, String, u16)>,
900 ) -> Result<CommandTemplate> {
901 let Some(connection) = self.remote_connection() else {
902 return Err(anyhow!("no remote connection"));
903 };
904 connection.build_command(program, args, env, working_dir, port_forward)
905 }
906
907 pub fn build_forward_ports_command(
908 &self,
909 forwards: Vec<(u16, String, u16)>,
910 ) -> Result<CommandTemplate> {
911 let Some(connection) = self.remote_connection() else {
912 return Err(anyhow!("no remote connection"));
913 };
914 connection.build_forward_ports_command(forwards)
915 }
916
917 pub fn upload_directory(
918 &self,
919 src_path: PathBuf,
920 dest_path: RemotePathBuf,
921 cx: &App,
922 ) -> Task<Result<()>> {
923 let Some(connection) = self.remote_connection() else {
924 return Task::ready(Err(anyhow!("no remote connection")));
925 };
926 connection.upload_directory(src_path, dest_path, cx)
927 }
928
929 pub fn proto_client(&self) -> AnyProtoClient {
930 self.client.clone().into()
931 }
932
933 pub fn connection_options(&self) -> RemoteConnectionOptions {
934 self.connection_options.clone()
935 }
936
937 pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
938 if let State::Connected {
939 remote_connection, ..
940 } = self.state.as_ref()?
941 {
942 Some(remote_connection.clone())
943 } else {
944 None
945 }
946 }
947
948 pub fn connection_state(&self) -> ConnectionState {
949 self.state
950 .as_ref()
951 .map(ConnectionState::from)
952 .unwrap_or(ConnectionState::Disconnected)
953 }
954
955 pub fn is_disconnected(&self) -> bool {
956 self.connection_state() == ConnectionState::Disconnected
957 }
958
959 pub fn path_style(&self) -> PathStyle {
960 self.path_style
961 }
962
963 /// Forcibly disconnects from the remote server by killing the underlying connection.
964 /// This will trigger the reconnection logic if reconnection attempts remain.
965 /// Useful for testing reconnection behavior in real environments.
966 pub fn force_disconnect(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
967 let Some(connection) = self.remote_connection() else {
968 return Task::ready(Err(anyhow!("no active remote connection to disconnect")));
969 };
970
971 log::info!("force_disconnect: killing remote connection");
972
973 cx.spawn(async move |_, _| {
974 connection.kill().await?;
975 Ok(())
976 })
977 }
978
979 /// Simulates a timeout by pausing heartbeat responses.
980 /// This will cause heartbeat failures and eventually trigger reconnection
981 /// after MAX_MISSED_HEARTBEATS are missed.
982 /// Useful for testing timeout behavior in real environments.
983 pub fn force_heartbeat_timeout(&mut self, attempts: usize, cx: &mut Context<Self>) {
984 log::info!("force_heartbeat_timeout: triggering heartbeat failure state");
985
986 if let Some(State::Connected {
987 remote_connection,
988 delegate,
989 multiplex_task,
990 heartbeat_task,
991 }) = self.state.take()
992 {
993 self.set_state(
994 if attempts == 0 {
995 State::HeartbeatMissed {
996 missed_heartbeats: MAX_MISSED_HEARTBEATS,
997 remote_connection,
998 delegate,
999 multiplex_task,
1000 heartbeat_task,
1001 }
1002 } else {
1003 State::ReconnectFailed {
1004 remote_connection,
1005 delegate,
1006 error: anyhow!("forced heartbeat timeout"),
1007 attempts,
1008 }
1009 },
1010 cx,
1011 );
1012
1013 self.reconnect(cx)
1014 .context("failed to start reconnect after forced timeout")
1015 .log_err();
1016 } else {
1017 log::warn!("force_heartbeat_timeout: not in Connected state, ignoring");
1018 }
1019 }
1020
1021 #[cfg(any(test, feature = "test-support"))]
1022 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
1023 let opts = self.connection_options();
1024 client_cx.spawn(async move |cx| {
1025 let connection = cx.update_global(|c: &mut ConnectionPool, _| {
1026 if let Some(ConnectionPoolEntry::Connected(c)) = c.connections.get(&opts) {
1027 if let Some(connection) = c.upgrade() {
1028 connection
1029 } else {
1030 panic!("connection was dropped")
1031 }
1032 } else {
1033 panic!("missing test connection")
1034 }
1035 });
1036
1037 connection.simulate_disconnect(cx);
1038 })
1039 }
1040
1041 /// Creates a mock connection pair for testing.
1042 ///
1043 /// This is the recommended way to create mock remote connections for tests.
1044 /// It returns the `MockConnectionOptions` (which can be passed to create a
1045 /// `HeadlessProject`), an `AnyProtoClient` for the server side and a
1046 /// `ConnectGuard` for the client side which blocks the connection from
1047 /// being established until dropped.
1048 ///
1049 /// # Example
1050 /// ```ignore
1051 /// let (opts, server_session, connect_guard) = RemoteClient::fake_server(cx, server_cx);
1052 /// // Set up HeadlessProject with server_session...
1053 /// drop(connect_guard);
1054 /// let client = RemoteClient::fake_client(opts, cx).await;
1055 /// ```
1056 #[cfg(any(test, feature = "test-support"))]
1057 pub fn fake_server(
1058 client_cx: &mut gpui::TestAppContext,
1059 server_cx: &mut gpui::TestAppContext,
1060 ) -> (RemoteConnectionOptions, AnyProtoClient, ConnectGuard) {
1061 use crate::transport::mock::MockConnection;
1062 let (opts, server_client, connect_guard) = MockConnection::new(client_cx, server_cx);
1063 (opts.into(), server_client, connect_guard)
1064 }
1065
1066 /// Creates a `RemoteClient` connected to a mock server.
1067 ///
1068 /// Call `fake_server` first to get the connection options, set up the
1069 /// `HeadlessProject` with the server session, then call this method
1070 /// to create the client.
1071 #[cfg(any(test, feature = "test-support"))]
1072 pub async fn connect_mock(
1073 opts: RemoteConnectionOptions,
1074 client_cx: &mut gpui::TestAppContext,
1075 ) -> Entity<Self> {
1076 assert!(matches!(opts, RemoteConnectionOptions::Mock(..)));
1077 use crate::transport::mock::MockDelegate;
1078 let (_tx, rx) = oneshot::channel();
1079 let mut cx = client_cx.to_async();
1080 let connection = connect(opts, Arc::new(MockDelegate), &mut cx)
1081 .await
1082 .unwrap();
1083 client_cx
1084 .update(|cx| {
1085 Self::new(
1086 ConnectionIdentifier::setup(),
1087 connection,
1088 rx,
1089 Arc::new(MockDelegate),
1090 cx,
1091 )
1092 })
1093 .await
1094 .unwrap()
1095 .unwrap()
1096 }
1097
1098 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1099 self.state
1100 .as_ref()
1101 .and_then(|state| state.remote_connection())
1102 }
1103}
1104
1105enum ConnectionPoolEntry {
1106 Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1107 Connected(Weak<dyn RemoteConnection>),
1108}
1109
1110#[derive(Default)]
1111struct ConnectionPool {
1112 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1113}
1114
1115impl Global for ConnectionPool {}
1116
1117impl ConnectionPool {
1118 fn connect(
1119 &mut self,
1120 opts: RemoteConnectionOptions,
1121 delegate: Arc<dyn RemoteClientDelegate>,
1122 cx: &mut App,
1123 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1124 let connection = self.connections.get(&opts);
1125 match connection {
1126 Some(ConnectionPoolEntry::Connecting(task)) => {
1127 delegate.set_status(
1128 Some("Waiting for existing connection attempt"),
1129 &mut cx.to_async(),
1130 );
1131 return task.clone();
1132 }
1133 Some(ConnectionPoolEntry::Connected(remote)) => {
1134 if let Some(remote) = remote.upgrade()
1135 && !remote.has_been_killed()
1136 {
1137 return Task::ready(Ok(remote)).shared();
1138 }
1139 self.connections.remove(&opts);
1140 }
1141 None => {}
1142 }
1143
1144 let task = cx
1145 .spawn({
1146 let opts = opts.clone();
1147 let delegate = delegate.clone();
1148 async move |cx| {
1149 let connection = match opts.clone() {
1150 RemoteConnectionOptions::Ssh(opts) => {
1151 SshRemoteConnection::new(opts, delegate, cx)
1152 .await
1153 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1154 }
1155 RemoteConnectionOptions::Wsl(opts) => {
1156 WslRemoteConnection::new(opts, delegate, cx)
1157 .await
1158 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1159 }
1160 RemoteConnectionOptions::Docker(opts) => {
1161 DockerExecConnection::new(opts, delegate, cx)
1162 .await
1163 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1164 }
1165 #[cfg(any(test, feature = "test-support"))]
1166 RemoteConnectionOptions::Mock(opts) => match cx.update(|cx| {
1167 cx.default_global::<crate::transport::mock::MockConnectionRegistry>()
1168 .take(&opts)
1169 }) {
1170 Some(connection) => Ok(connection.await as Arc<dyn RemoteConnection>),
1171 None => Err(anyhow!(
1172 "Mock connection not found. Call MockConnection::new() first."
1173 )),
1174 },
1175 };
1176
1177 cx.update_global(|pool: &mut Self, _| {
1178 debug_assert!(matches!(
1179 pool.connections.get(&opts),
1180 Some(ConnectionPoolEntry::Connecting(_))
1181 ));
1182 match connection {
1183 Ok(connection) => {
1184 pool.connections.insert(
1185 opts.clone(),
1186 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1187 );
1188 Ok(connection)
1189 }
1190 Err(error) => {
1191 pool.connections.remove(&opts);
1192 Err(Arc::new(error))
1193 }
1194 }
1195 })
1196 }
1197 })
1198 .shared();
1199
1200 self.connections
1201 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1202 task
1203 }
1204}
1205
1206#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1207pub enum RemoteConnectionOptions {
1208 Ssh(SshConnectionOptions),
1209 Wsl(WslConnectionOptions),
1210 Docker(DockerConnectionOptions),
1211 #[cfg(any(test, feature = "test-support"))]
1212 Mock(crate::transport::mock::MockConnectionOptions),
1213}
1214
1215impl RemoteConnectionOptions {
1216 pub fn display_name(&self) -> String {
1217 match self {
1218 RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1219 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1220 RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1221 #[cfg(any(test, feature = "test-support"))]
1222 RemoteConnectionOptions::Mock(opts) => format!("mock-{}", opts.id),
1223 }
1224 }
1225}
1226
1227impl From<SshConnectionOptions> for RemoteConnectionOptions {
1228 fn from(opts: SshConnectionOptions) -> Self {
1229 RemoteConnectionOptions::Ssh(opts)
1230 }
1231}
1232
1233impl From<WslConnectionOptions> for RemoteConnectionOptions {
1234 fn from(opts: WslConnectionOptions) -> Self {
1235 RemoteConnectionOptions::Wsl(opts)
1236 }
1237}
1238
1239#[cfg(any(test, feature = "test-support"))]
1240impl From<crate::transport::mock::MockConnectionOptions> for RemoteConnectionOptions {
1241 fn from(opts: crate::transport::mock::MockConnectionOptions) -> Self {
1242 RemoteConnectionOptions::Mock(opts)
1243 }
1244}
1245
1246#[cfg(target_os = "windows")]
1247/// Open a wsl path (\\wsl.localhost\<distro>\path)
1248#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1249#[action(namespace = workspace, no_json, no_register)]
1250pub struct OpenWslPath {
1251 pub distro: WslConnectionOptions,
1252 pub paths: Vec<PathBuf>,
1253}
1254
1255#[async_trait(?Send)]
1256pub trait RemoteConnection: Send + Sync {
1257 fn start_proxy(
1258 &self,
1259 unique_identifier: String,
1260 reconnect: bool,
1261 incoming_tx: UnboundedSender<Envelope>,
1262 outgoing_rx: UnboundedReceiver<Envelope>,
1263 connection_activity_tx: Sender<()>,
1264 delegate: Arc<dyn RemoteClientDelegate>,
1265 cx: &mut AsyncApp,
1266 ) -> Task<Result<i32>>;
1267 fn upload_directory(
1268 &self,
1269 src_path: PathBuf,
1270 dest_path: RemotePathBuf,
1271 cx: &App,
1272 ) -> Task<Result<()>>;
1273 async fn kill(&self) -> Result<()>;
1274 fn has_been_killed(&self) -> bool;
1275 fn shares_network_interface(&self) -> bool {
1276 false
1277 }
1278 fn build_command(
1279 &self,
1280 program: Option<String>,
1281 args: &[String],
1282 env: &HashMap<String, String>,
1283 working_dir: Option<String>,
1284 port_forward: Option<(u16, String, u16)>,
1285 ) -> Result<CommandTemplate>;
1286 fn build_forward_ports_command(
1287 &self,
1288 forwards: Vec<(u16, String, u16)>,
1289 ) -> Result<CommandTemplate>;
1290 fn connection_options(&self) -> RemoteConnectionOptions;
1291 fn path_style(&self) -> PathStyle;
1292 fn shell(&self) -> String;
1293 fn default_system_shell(&self) -> String;
1294 fn has_wsl_interop(&self) -> bool;
1295
1296 #[cfg(any(test, feature = "test-support"))]
1297 fn simulate_disconnect(&self, _: &AsyncApp) {}
1298}
1299
1300type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1301
1302struct Signal<T> {
1303 tx: Mutex<Option<oneshot::Sender<T>>>,
1304 rx: Shared<Task<Option<T>>>,
1305}
1306
1307impl<T: Send + Clone + 'static> Signal<T> {
1308 pub fn new(cx: &App) -> Self {
1309 let (tx, rx) = oneshot::channel();
1310
1311 let task = cx
1312 .background_executor()
1313 .spawn(async move { rx.await.ok() })
1314 .shared();
1315
1316 Self {
1317 tx: Mutex::new(Some(tx)),
1318 rx: task,
1319 }
1320 }
1321
1322 fn set(&self, value: T) {
1323 if let Some(tx) = self.tx.lock().take() {
1324 let _ = tx.send(value);
1325 }
1326 }
1327
1328 fn wait(&self) -> Shared<Task<Option<T>>> {
1329 self.rx.clone()
1330 }
1331}
1332
1333pub(crate) struct ChannelClient {
1334 next_message_id: AtomicU32,
1335 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1336 buffer: Mutex<VecDeque<Envelope>>,
1337 response_channels: ResponseChannels,
1338 message_handlers: Mutex<ProtoMessageHandlerSet>,
1339 max_received: AtomicU32,
1340 name: &'static str,
1341 task: Mutex<Task<Result<()>>>,
1342 remote_started: Signal<()>,
1343 has_wsl_interop: bool,
1344}
1345
1346impl ChannelClient {
1347 pub(crate) fn new(
1348 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1349 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1350 cx: &App,
1351 name: &'static str,
1352 has_wsl_interop: bool,
1353 ) -> Arc<Self> {
1354 Arc::new_cyclic(|this| Self {
1355 outgoing_tx: Mutex::new(outgoing_tx),
1356 next_message_id: AtomicU32::new(0),
1357 max_received: AtomicU32::new(0),
1358 response_channels: ResponseChannels::default(),
1359 message_handlers: Default::default(),
1360 buffer: Mutex::new(VecDeque::new()),
1361 name,
1362 task: Mutex::new(Self::start_handling_messages(
1363 this.clone(),
1364 incoming_rx,
1365 &cx.to_async(),
1366 )),
1367 remote_started: Signal::new(cx),
1368 has_wsl_interop,
1369 })
1370 }
1371
1372 fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1373 self.remote_started.wait()
1374 }
1375
1376 fn start_handling_messages(
1377 this: Weak<Self>,
1378 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1379 cx: &AsyncApp,
1380 ) -> Task<Result<()>> {
1381 cx.spawn(async move |cx| {
1382 if let Some(this) = this.upgrade() {
1383 let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1384 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1385 };
1386
1387 let peer_id = PeerId { owner_id: 0, id: 0 };
1388 while let Some(incoming) = incoming_rx.next().await {
1389 let Some(this) = this.upgrade() else {
1390 return anyhow::Ok(());
1391 };
1392 if let Some(ack_id) = incoming.ack_id {
1393 let mut buffer = this.buffer.lock();
1394 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1395 buffer.pop_front();
1396 }
1397 }
1398 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1399 {
1400 log::debug!(
1401 "{}:remote message received. name:FlushBufferedMessages",
1402 this.name
1403 );
1404 {
1405 let buffer = this.buffer.lock();
1406 for envelope in buffer.iter() {
1407 this.outgoing_tx
1408 .lock()
1409 .unbounded_send(envelope.clone())
1410 .ok();
1411 }
1412 }
1413 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1414 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1415 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1416 continue;
1417 }
1418
1419 if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1420 this.remote_started.set(());
1421 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1422 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1423 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1424 continue;
1425 }
1426
1427 this.max_received.store(incoming.id, SeqCst);
1428
1429 if let Some(request_id) = incoming.responding_to {
1430 let request_id = MessageId(request_id);
1431 let sender = this.response_channels.lock().remove(&request_id);
1432 if let Some(sender) = sender {
1433 let (tx, rx) = oneshot::channel();
1434 if incoming.payload.is_some() {
1435 sender.send((incoming, tx)).ok();
1436 }
1437 rx.await.ok();
1438 }
1439 } else if let Some(envelope) =
1440 build_typed_envelope(peer_id, Instant::now(), incoming)
1441 {
1442 let type_name = envelope.payload_type_name();
1443 let message_id = envelope.message_id();
1444 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1445 &this.message_handlers,
1446 envelope,
1447 this.clone().into(),
1448 cx.clone(),
1449 ) {
1450 log::debug!("{}:remote message received. name:{type_name}", this.name);
1451 cx.foreground_executor()
1452 .spawn(async move {
1453 match future.await {
1454 Ok(_) => {
1455 log::debug!(
1456 "{}:remote message handled. name:{type_name}",
1457 this.name
1458 );
1459 }
1460 Err(error) => {
1461 log::error!(
1462 "{}:error handling message. type:{}, error:{:#}",
1463 this.name,
1464 type_name,
1465 format!("{error:#}").lines().fold(
1466 String::new(),
1467 |mut message, line| {
1468 if !message.is_empty() {
1469 message.push(' ');
1470 }
1471 message.push_str(line);
1472 message
1473 }
1474 )
1475 );
1476 }
1477 }
1478 })
1479 .detach()
1480 } else {
1481 log::error!("{}:unhandled remote message name:{type_name}", this.name);
1482 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1483 message_id,
1484 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1485 ) {
1486 log::error!(
1487 "{}:error sending error response for {type_name}:{e:#}",
1488 this.name
1489 );
1490 }
1491 }
1492 }
1493 }
1494 anyhow::Ok(())
1495 })
1496 }
1497
1498 pub(crate) fn reconnect(
1499 self: &Arc<Self>,
1500 incoming_rx: UnboundedReceiver<Envelope>,
1501 outgoing_tx: UnboundedSender<Envelope>,
1502 cx: &AsyncApp,
1503 ) {
1504 *self.outgoing_tx.lock() = outgoing_tx;
1505 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1506 }
1507
1508 fn request<T: RequestMessage>(
1509 &self,
1510 payload: T,
1511 ) -> impl 'static + Future<Output = Result<T::Response>> {
1512 self.request_internal(payload, true)
1513 }
1514
1515 fn request_internal<T: RequestMessage>(
1516 &self,
1517 payload: T,
1518 use_buffer: bool,
1519 ) -> impl 'static + Future<Output = Result<T::Response>> {
1520 log::debug!("remote request start. name:{}", T::NAME);
1521 let response =
1522 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1523 async move {
1524 let response = response.await?;
1525 log::debug!("remote request finish. name:{}", T::NAME);
1526 T::Response::from_envelope(response).context("received a response of the wrong type")
1527 }
1528 }
1529
1530 async fn resync(&self, timeout: Duration) -> Result<()> {
1531 smol::future::or(
1532 async {
1533 self.request_internal(proto::FlushBufferedMessages {}, false)
1534 .await?;
1535
1536 for envelope in self.buffer.lock().iter() {
1537 self.outgoing_tx
1538 .lock()
1539 .unbounded_send(envelope.clone())
1540 .ok();
1541 }
1542 Ok(())
1543 },
1544 async {
1545 smol::Timer::after(timeout).await;
1546 anyhow::bail!("Timed out resyncing remote client")
1547 },
1548 )
1549 .await
1550 }
1551
1552 async fn ping(&self, timeout: Duration) -> Result<()> {
1553 smol::future::or(
1554 async {
1555 self.request(proto::Ping {}).await?;
1556 Ok(())
1557 },
1558 async {
1559 smol::Timer::after(timeout).await;
1560 anyhow::bail!("Timed out pinging remote client")
1561 },
1562 )
1563 .await
1564 }
1565
1566 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1567 log::debug!("remote send name:{}", T::NAME);
1568 self.send_dynamic(payload.into_envelope(0, None, None))
1569 }
1570
1571 fn request_dynamic(
1572 &self,
1573 mut envelope: proto::Envelope,
1574 type_name: &'static str,
1575 use_buffer: bool,
1576 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1577 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1578 let (tx, rx) = oneshot::channel();
1579 let mut response_channels_lock = self.response_channels.lock();
1580 response_channels_lock.insert(MessageId(envelope.id), tx);
1581 drop(response_channels_lock);
1582
1583 let result = if use_buffer {
1584 self.send_buffered(envelope)
1585 } else {
1586 self.send_unbuffered(envelope)
1587 };
1588 async move {
1589 if let Err(error) = &result {
1590 log::error!("failed to send message: {error}");
1591 anyhow::bail!("failed to send message: {error}");
1592 }
1593
1594 let response = rx.await.context("connection lost")?.0;
1595 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1596 return Err(RpcError::from_proto(error, type_name));
1597 }
1598 Ok(response)
1599 }
1600 }
1601
1602 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1603 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1604 self.send_buffered(envelope)
1605 }
1606
1607 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1608 envelope.ack_id = Some(self.max_received.load(SeqCst));
1609 self.buffer.lock().push_back(envelope.clone());
1610 // ignore errors on send (happen while we're reconnecting)
1611 // assume that the global "disconnected" overlay is sufficient.
1612 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1613 Ok(())
1614 }
1615
1616 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1617 envelope.ack_id = Some(self.max_received.load(SeqCst));
1618 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1619 Ok(())
1620 }
1621}
1622
1623impl ProtoClient for ChannelClient {
1624 fn request(
1625 &self,
1626 envelope: proto::Envelope,
1627 request_type: &'static str,
1628 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1629 self.request_dynamic(envelope, request_type, true).boxed()
1630 }
1631
1632 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1633 self.send_dynamic(envelope)
1634 }
1635
1636 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1637 self.send_dynamic(envelope)
1638 }
1639
1640 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1641 &self.message_handlers
1642 }
1643
1644 fn is_via_collab(&self) -> bool {
1645 false
1646 }
1647
1648 fn has_wsl_interop(&self) -> bool {
1649 self.has_wsl_interop
1650 }
1651}