1use crate::{
2 SshConnectionOptions,
3 protocol::MessageId,
4 proxy::ProxyLaunchError,
5 transport::{
6 docker::{DockerConnectionOptions, DockerExecConnection},
7 ssh::SshRemoteConnection,
8 wsl::{WslConnectionOptions, WslRemoteConnection},
9 },
10};
11use anyhow::{Context as _, Result, anyhow};
12use askpass::EncryptedPassword;
13use async_trait::async_trait;
14use collections::HashMap;
15use futures::{
16 Future, FutureExt as _, StreamExt as _,
17 channel::{
18 mpsc::{self, Sender, UnboundedReceiver, UnboundedSender},
19 oneshot,
20 },
21 future::{BoxFuture, Shared},
22 select, select_biased,
23};
24use gpui::{
25 App, AppContext as _, AsyncApp, BackgroundExecutor, BorrowAppContext, Context, Entity,
26 EventEmitter, FutureExt, Global, Task, WeakEntity,
27};
28use parking_lot::Mutex;
29
30use release_channel::ReleaseChannel;
31use rpc::{
32 AnyProtoClient, ErrorExt, ProtoClient, ProtoMessageHandlerSet, RpcError,
33 proto::{self, Envelope, EnvelopedMessage, PeerId, RequestMessage, build_typed_envelope},
34};
35use semver::Version;
36use std::{
37 collections::VecDeque,
38 fmt,
39 ops::ControlFlow,
40 path::PathBuf,
41 sync::{
42 Arc, Weak,
43 atomic::{AtomicU32, AtomicU64, Ordering::SeqCst},
44 },
45 time::{Duration, Instant},
46};
47use util::{
48 ResultExt,
49 paths::{PathStyle, RemotePathBuf},
50};
51
52#[derive(Copy, Clone, Debug, PartialEq, Eq)]
53pub enum RemoteOs {
54 Linux,
55 MacOs,
56 Windows,
57}
58
59impl RemoteOs {
60 pub fn as_str(&self) -> &'static str {
61 match self {
62 RemoteOs::Linux => "linux",
63 RemoteOs::MacOs => "macos",
64 RemoteOs::Windows => "windows",
65 }
66 }
67
68 pub fn is_windows(&self) -> bool {
69 matches!(self, RemoteOs::Windows)
70 }
71}
72
73impl std::fmt::Display for RemoteOs {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.write_str(self.as_str())
76 }
77}
78
79#[derive(Copy, Clone, Debug, PartialEq, Eq)]
80pub enum RemoteArch {
81 X86_64,
82 Aarch64,
83}
84
85impl RemoteArch {
86 pub fn as_str(&self) -> &'static str {
87 match self {
88 RemoteArch::X86_64 => "x86_64",
89 RemoteArch::Aarch64 => "aarch64",
90 }
91 }
92}
93
94impl std::fmt::Display for RemoteArch {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.write_str(self.as_str())
97 }
98}
99
100#[derive(Copy, Clone, Debug)]
101pub struct RemotePlatform {
102 pub os: RemoteOs,
103 pub arch: RemoteArch,
104}
105
106#[derive(Clone, Debug)]
107pub struct CommandTemplate {
108 pub program: String,
109 pub args: Vec<String>,
110 pub env: HashMap<String, String>,
111}
112
113pub trait RemoteClientDelegate: Send + Sync {
114 fn ask_password(
115 &self,
116 prompt: String,
117 tx: oneshot::Sender<EncryptedPassword>,
118 cx: &mut AsyncApp,
119 );
120 fn get_download_url(
121 &self,
122 platform: RemotePlatform,
123 release_channel: ReleaseChannel,
124 version: Option<Version>,
125 cx: &mut AsyncApp,
126 ) -> Task<Result<Option<String>>>;
127 fn download_server_binary_locally(
128 &self,
129 platform: RemotePlatform,
130 release_channel: ReleaseChannel,
131 version: Option<Version>,
132 cx: &mut AsyncApp,
133 ) -> Task<Result<PathBuf>>;
134 fn set_status(&self, status: Option<&str>, cx: &mut AsyncApp);
135}
136
137const MAX_MISSED_HEARTBEATS: usize = 5;
138const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
139const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
140const INITIAL_CONNECTION_TIMEOUT: Duration =
141 Duration::from_secs(if cfg!(debug_assertions) { 5 } else { 60 });
142
143const MAX_RECONNECT_ATTEMPTS: usize = 3;
144
145enum State {
146 Connecting,
147 Connected {
148 remote_connection: Arc<dyn RemoteConnection>,
149 delegate: Arc<dyn RemoteClientDelegate>,
150
151 multiplex_task: Task<Result<()>>,
152 heartbeat_task: Task<Result<()>>,
153 },
154 HeartbeatMissed {
155 missed_heartbeats: usize,
156
157 remote_connection: Arc<dyn RemoteConnection>,
158 delegate: Arc<dyn RemoteClientDelegate>,
159
160 multiplex_task: Task<Result<()>>,
161 heartbeat_task: Task<Result<()>>,
162 },
163 Reconnecting,
164 ReconnectFailed {
165 remote_connection: Arc<dyn RemoteConnection>,
166 delegate: Arc<dyn RemoteClientDelegate>,
167
168 error: anyhow::Error,
169 attempts: usize,
170 },
171 ReconnectExhausted,
172 ServerNotRunning,
173}
174
175impl fmt::Display for State {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 match self {
178 Self::Connecting => write!(f, "connecting"),
179 Self::Connected { .. } => write!(f, "connected"),
180 Self::Reconnecting => write!(f, "reconnecting"),
181 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
182 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
183 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
184 Self::ServerNotRunning { .. } => write!(f, "server not running"),
185 }
186 }
187}
188
189impl State {
190 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
191 match self {
192 Self::Connected {
193 remote_connection, ..
194 } => Some(remote_connection.clone()),
195 Self::HeartbeatMissed {
196 remote_connection, ..
197 } => Some(remote_connection.clone()),
198 Self::ReconnectFailed {
199 remote_connection, ..
200 } => Some(remote_connection.clone()),
201 _ => None,
202 }
203 }
204
205 fn can_reconnect(&self) -> bool {
206 match self {
207 Self::Connected { .. }
208 | Self::HeartbeatMissed { .. }
209 | Self::ReconnectFailed { .. } => true,
210 State::Connecting
211 | State::Reconnecting
212 | State::ReconnectExhausted
213 | State::ServerNotRunning => false,
214 }
215 }
216
217 fn is_reconnect_failed(&self) -> bool {
218 matches!(self, Self::ReconnectFailed { .. })
219 }
220
221 fn is_reconnect_exhausted(&self) -> bool {
222 matches!(self, Self::ReconnectExhausted { .. })
223 }
224
225 fn is_server_not_running(&self) -> bool {
226 matches!(self, Self::ServerNotRunning)
227 }
228
229 fn is_reconnecting(&self) -> bool {
230 matches!(self, Self::Reconnecting { .. })
231 }
232
233 fn heartbeat_recovered(self) -> Self {
234 match self {
235 Self::HeartbeatMissed {
236 remote_connection,
237 delegate,
238 multiplex_task,
239 heartbeat_task,
240 ..
241 } => Self::Connected {
242 remote_connection: remote_connection,
243 delegate,
244 multiplex_task,
245 heartbeat_task,
246 },
247 _ => self,
248 }
249 }
250
251 fn heartbeat_missed(self) -> Self {
252 match self {
253 Self::Connected {
254 remote_connection,
255 delegate,
256 multiplex_task,
257 heartbeat_task,
258 } => Self::HeartbeatMissed {
259 missed_heartbeats: 1,
260 remote_connection,
261 delegate,
262 multiplex_task,
263 heartbeat_task,
264 },
265 Self::HeartbeatMissed {
266 missed_heartbeats,
267 remote_connection,
268 delegate,
269 multiplex_task,
270 heartbeat_task,
271 } => Self::HeartbeatMissed {
272 missed_heartbeats: missed_heartbeats + 1,
273 remote_connection,
274 delegate,
275 multiplex_task,
276 heartbeat_task,
277 },
278 _ => self,
279 }
280 }
281}
282
283/// The state of the ssh connection.
284#[derive(Clone, Copy, Debug, PartialEq, Eq)]
285pub enum ConnectionState {
286 Connecting,
287 Connected,
288 HeartbeatMissed,
289 Reconnecting,
290 Disconnected,
291}
292
293impl From<&State> for ConnectionState {
294 fn from(value: &State) -> Self {
295 match value {
296 State::Connecting => Self::Connecting,
297 State::Connected { .. } => Self::Connected,
298 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
299 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
300 State::ReconnectExhausted => Self::Disconnected,
301 State::ServerNotRunning => Self::Disconnected,
302 }
303 }
304}
305
306pub struct RemoteClient {
307 client: Arc<ChannelClient>,
308 unique_identifier: String,
309 connection_options: RemoteConnectionOptions,
310 path_style: PathStyle,
311 state: Option<State>,
312}
313
314#[derive(Debug)]
315pub enum RemoteClientEvent {
316 Disconnected,
317}
318
319impl EventEmitter<RemoteClientEvent> for RemoteClient {}
320
321/// Identifies the socket on the remote server so that reconnects
322/// can re-join the same project.
323pub enum ConnectionIdentifier {
324 Setup(u64),
325 Workspace(i64),
326}
327
328static NEXT_ID: AtomicU64 = AtomicU64::new(1);
329
330impl ConnectionIdentifier {
331 pub fn setup() -> Self {
332 Self::Setup(NEXT_ID.fetch_add(1, SeqCst))
333 }
334
335 // This string gets used in a socket name, and so must be relatively short.
336 // The total length of:
337 // /home/{username}/.local/share/zed/server_state/{name}/stdout.sock
338 // Must be less than about 100 characters
339 // https://unix.stackexchange.com/questions/367008/why-is-socket-path-length-limited-to-a-hundred-chars
340 // So our strings should be at most 20 characters or so.
341 fn to_string(&self, cx: &App) -> String {
342 let identifier_prefix = match ReleaseChannel::global(cx) {
343 ReleaseChannel::Stable => "".to_string(),
344 release_channel => format!("{}-", release_channel.dev_name()),
345 };
346 match self {
347 Self::Setup(setup_id) => format!("{identifier_prefix}setup-{setup_id}"),
348 Self::Workspace(workspace_id) => {
349 format!("{identifier_prefix}workspace-{workspace_id}",)
350 }
351 }
352 }
353}
354
355pub async fn connect(
356 connection_options: RemoteConnectionOptions,
357 delegate: Arc<dyn RemoteClientDelegate>,
358 cx: &mut AsyncApp,
359) -> Result<Arc<dyn RemoteConnection>> {
360 cx.update(|cx| {
361 cx.update_default_global(|pool: &mut ConnectionPool, cx| {
362 pool.connect(connection_options.clone(), delegate.clone(), cx)
363 })
364 })
365 .await
366 .map_err(|e| e.cloned())
367}
368
369impl RemoteClient {
370 pub fn new(
371 unique_identifier: ConnectionIdentifier,
372 remote_connection: Arc<dyn RemoteConnection>,
373 cancellation: oneshot::Receiver<()>,
374 delegate: Arc<dyn RemoteClientDelegate>,
375 cx: &mut App,
376 ) -> Task<Result<Option<Entity<Self>>>> {
377 let unique_identifier = unique_identifier.to_string(cx);
378 cx.spawn(async move |cx| {
379 let success = Box::pin(async move {
380 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
381 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
382 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
383
384 let client = cx.update(|cx| {
385 ChannelClient::new(
386 incoming_rx,
387 outgoing_tx,
388 cx,
389 "client",
390 remote_connection.has_wsl_interop(),
391 )
392 });
393
394 let path_style = remote_connection.path_style();
395 let this = cx.new(|_| Self {
396 client: client.clone(),
397 unique_identifier: unique_identifier.clone(),
398 connection_options: remote_connection.connection_options(),
399 path_style,
400 state: Some(State::Connecting),
401 });
402
403 let io_task = remote_connection.start_proxy(
404 unique_identifier,
405 false,
406 incoming_tx,
407 outgoing_rx,
408 connection_activity_tx,
409 delegate.clone(),
410 cx,
411 );
412
413 let ready = client
414 .wait_for_remote_started()
415 .with_timeout(INITIAL_CONNECTION_TIMEOUT, cx.background_executor())
416 .await;
417 match ready {
418 Ok(Some(_)) => {}
419 Ok(None) => {
420 let mut error = "remote client exited before becoming ready".to_owned();
421 if let Some(status) = io_task.now_or_never() {
422 match status {
423 Ok(exit_code) => {
424 error.push_str(&format!(", exit_code={exit_code:?}"))
425 }
426 Err(e) => error.push_str(&format!(", error={e:?}")),
427 }
428 }
429 let error = anyhow::anyhow!("{error}");
430 log::error!("failed to establish connection: {}", error);
431 return Err(error);
432 }
433 Err(_) => {
434 let mut error =
435 "remote client did not become ready within the timeout".to_owned();
436 if let Some(status) = io_task.now_or_never() {
437 match status {
438 Ok(exit_code) => {
439 error.push_str(&format!(", exit_code={exit_code:?}"))
440 }
441 Err(e) => error.push_str(&format!(", error={e:?}")),
442 }
443 }
444 let error = anyhow::anyhow!("{error}");
445 log::error!("failed to establish connection: {}", error);
446 return Err(error);
447 }
448 }
449 let multiplex_task = Self::monitor(this.downgrade(), io_task, cx);
450 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
451 log::error!("failed to establish connection: {}", error);
452 return Err(error);
453 }
454
455 let heartbeat_task = Self::heartbeat(this.downgrade(), connection_activity_rx, cx);
456
457 this.update(cx, |this, _| {
458 this.state = Some(State::Connected {
459 remote_connection,
460 delegate,
461 multiplex_task,
462 heartbeat_task,
463 });
464 });
465
466 Ok(Some(this))
467 });
468
469 select! {
470 _ = cancellation.fuse() => {
471 Ok(None)
472 }
473 result = success.fuse() => result
474 }
475 })
476 }
477
478 pub fn proto_client_from_channels(
479 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
480 outgoing_tx: mpsc::UnboundedSender<Envelope>,
481 cx: &App,
482 name: &'static str,
483 has_wsl_interop: bool,
484 ) -> AnyProtoClient {
485 ChannelClient::new(incoming_rx, outgoing_tx, cx, name, has_wsl_interop).into()
486 }
487
488 pub fn shutdown_processes<T: RequestMessage>(
489 &mut self,
490 shutdown_request: Option<T>,
491 executor: BackgroundExecutor,
492 ) -> Option<impl Future<Output = ()> + use<T>> {
493 let state = self.state.take()?;
494 log::info!("shutting down ssh processes");
495
496 let State::Connected {
497 multiplex_task,
498 heartbeat_task,
499 remote_connection,
500 delegate,
501 } = state
502 else {
503 return None;
504 };
505
506 let client = self.client.clone();
507
508 Some(async move {
509 if let Some(shutdown_request) = shutdown_request {
510 client.send(shutdown_request).log_err();
511 // We wait 50ms instead of waiting for a response, because
512 // waiting for a response would require us to wait on the main thread
513 // which we want to avoid in an `on_app_quit` callback.
514 executor.timer(Duration::from_millis(50)).await;
515 }
516
517 // Drop `multiplex_task` because it owns our remote_connection_proxy_process, which is a
518 // child of master_process.
519 drop(multiplex_task);
520 // Now drop the rest of state, which kills master process.
521 drop(heartbeat_task);
522 drop(remote_connection);
523 drop(delegate);
524 })
525 }
526
527 fn reconnect(&mut self, cx: &mut Context<Self>) -> Result<()> {
528 let can_reconnect = self
529 .state
530 .as_ref()
531 .map(|state| state.can_reconnect())
532 .unwrap_or(false);
533 if !can_reconnect {
534 log::info!("aborting reconnect, because not in state that allows reconnecting");
535 let error = if let Some(state) = self.state.as_ref() {
536 format!("invalid state, cannot reconnect while in state {state}")
537 } else {
538 "no state set".to_string()
539 };
540 anyhow::bail!(error);
541 }
542
543 let state = self.state.take().unwrap();
544 let (attempts, remote_connection, delegate) = match state {
545 State::Connected {
546 remote_connection,
547 delegate,
548 multiplex_task,
549 heartbeat_task,
550 }
551 | State::HeartbeatMissed {
552 remote_connection,
553 delegate,
554 multiplex_task,
555 heartbeat_task,
556 ..
557 } => {
558 drop(multiplex_task);
559 drop(heartbeat_task);
560 (0, remote_connection, delegate)
561 }
562 State::ReconnectFailed {
563 attempts,
564 remote_connection,
565 delegate,
566 ..
567 } => (attempts, remote_connection, delegate),
568 State::Connecting
569 | State::Reconnecting
570 | State::ReconnectExhausted
571 | State::ServerNotRunning => unreachable!(),
572 };
573
574 let attempts = attempts + 1;
575 if attempts > MAX_RECONNECT_ATTEMPTS {
576 log::error!(
577 "Failed to reconnect to after {} attempts, giving up",
578 MAX_RECONNECT_ATTEMPTS
579 );
580 self.set_state(State::ReconnectExhausted, cx);
581 return Ok(());
582 }
583
584 self.set_state(State::Reconnecting, cx);
585
586 log::info!(
587 "Trying to reconnect to remote server... Attempt {}",
588 attempts
589 );
590
591 let unique_identifier = self.unique_identifier.clone();
592 let client = self.client.clone();
593 let reconnect_task = cx.spawn(async move |this, cx| {
594 macro_rules! failed {
595 ($error:expr, $attempts:expr, $remote_connection:expr, $delegate:expr) => {
596 delegate.set_status(Some(&format!("{error:#}", error = $error)), cx);
597 return State::ReconnectFailed {
598 error: anyhow!($error),
599 attempts: $attempts,
600 remote_connection: $remote_connection,
601 delegate: $delegate,
602 };
603 };
604 }
605
606 if let Err(error) = remote_connection
607 .kill()
608 .await
609 .context("Failed to kill remote_connection process")
610 {
611 failed!(error, attempts, remote_connection, delegate);
612 };
613
614 let connection_options = remote_connection.connection_options();
615
616 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
617 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
618 let (connection_activity_tx, connection_activity_rx) = mpsc::channel::<()>(1);
619
620 let (remote_connection, io_task) = match async {
621 let remote_connection = cx
622 .update_global(|pool: &mut ConnectionPool, cx| {
623 pool.connect(connection_options, delegate.clone(), cx)
624 })
625 .await
626 .map_err(|error| error.cloned())?;
627
628 let io_task = remote_connection.start_proxy(
629 unique_identifier,
630 true,
631 incoming_tx,
632 outgoing_rx,
633 connection_activity_tx,
634 delegate.clone(),
635 cx,
636 );
637 anyhow::Ok((remote_connection, io_task))
638 }
639 .await
640 {
641 Ok((remote_connection, io_task)) => (remote_connection, io_task),
642 Err(error) => {
643 failed!(error, attempts, remote_connection, delegate);
644 }
645 };
646
647 let multiplex_task = Self::monitor(this.clone(), io_task, cx);
648 client.reconnect(incoming_rx, outgoing_tx, cx);
649
650 if let Err(error) = client.resync(HEARTBEAT_TIMEOUT).await {
651 failed!(error, attempts, remote_connection, delegate);
652 };
653
654 State::Connected {
655 remote_connection: remote_connection,
656 delegate,
657 multiplex_task,
658 heartbeat_task: Self::heartbeat(this.clone(), connection_activity_rx, cx),
659 }
660 });
661
662 cx.spawn(async move |this, cx| {
663 let new_state = reconnect_task.await;
664 this.update(cx, |this, cx| {
665 this.try_set_state(cx, |old_state| {
666 if old_state.is_reconnecting() {
667 match &new_state {
668 State::Connecting
669 | State::Reconnecting
670 | State::HeartbeatMissed { .. }
671 | State::ServerNotRunning => {}
672 State::Connected { .. } => {
673 log::info!("Successfully reconnected");
674 }
675 State::ReconnectFailed {
676 error, attempts, ..
677 } => {
678 log::error!(
679 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
680 attempts,
681 error
682 );
683 }
684 State::ReconnectExhausted => {
685 log::error!("Reconnect attempt failed and all attempts exhausted");
686 }
687 }
688 Some(new_state)
689 } else {
690 None
691 }
692 });
693
694 if this.state_is(State::is_reconnect_failed) {
695 this.reconnect(cx)
696 } else if this.state_is(State::is_reconnect_exhausted) {
697 Ok(())
698 } else {
699 log::debug!("State has transition from Reconnecting into new state while attempting reconnect.");
700 Ok(())
701 }
702 })
703 })
704 .detach_and_log_err(cx);
705
706 Ok(())
707 }
708
709 fn heartbeat(
710 this: WeakEntity<Self>,
711 mut connection_activity_rx: mpsc::Receiver<()>,
712 cx: &mut AsyncApp,
713 ) -> Task<Result<()>> {
714 let Ok(client) = this.read_with(cx, |this, _| this.client.clone()) else {
715 return Task::ready(Err(anyhow!("remote_connectionRemoteClient lost")));
716 };
717
718 cx.spawn(async move |cx| {
719 let mut missed_heartbeats = 0;
720
721 let keepalive_timer = cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse();
722 futures::pin_mut!(keepalive_timer);
723
724 loop {
725 select_biased! {
726 result = connection_activity_rx.next().fuse() => {
727 if result.is_none() {
728 log::warn!("remote heartbeat: connection activity channel has been dropped. stopping.");
729 return Ok(());
730 }
731
732 if missed_heartbeats != 0 {
733 missed_heartbeats = 0;
734 let _ =this.update(cx, |this, cx| {
735 this.handle_heartbeat_result(missed_heartbeats, cx)
736 })?;
737 }
738 }
739 _ = keepalive_timer => {
740 log::debug!("Sending heartbeat to server...");
741
742 let result = select_biased! {
743 _ = connection_activity_rx.next().fuse() => {
744 Ok(())
745 }
746 ping_result = client.ping(HEARTBEAT_TIMEOUT).fuse() => {
747 ping_result
748 }
749 };
750
751 if result.is_err() {
752 missed_heartbeats += 1;
753 log::warn!(
754 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
755 HEARTBEAT_TIMEOUT,
756 missed_heartbeats,
757 MAX_MISSED_HEARTBEATS
758 );
759 } else if missed_heartbeats != 0 {
760 missed_heartbeats = 0;
761 } else {
762 continue;
763 }
764
765 let result = this.update(cx, |this, cx| {
766 this.handle_heartbeat_result(missed_heartbeats, cx)
767 })?;
768 if result.is_break() {
769 return Ok(());
770 }
771 }
772 }
773
774 keepalive_timer.set(cx.background_executor().timer(HEARTBEAT_INTERVAL).fuse());
775 }
776 })
777 }
778
779 fn handle_heartbeat_result(
780 &mut self,
781 missed_heartbeats: usize,
782 cx: &mut Context<Self>,
783 ) -> ControlFlow<()> {
784 let state = self.state.take().unwrap();
785 let next_state = if missed_heartbeats > 0 {
786 state.heartbeat_missed()
787 } else {
788 state.heartbeat_recovered()
789 };
790
791 self.set_state(next_state, cx);
792
793 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
794 log::error!(
795 "Missed last {} heartbeats. Reconnecting...",
796 missed_heartbeats
797 );
798
799 self.reconnect(cx)
800 .context("failed to start reconnect process after missing heartbeats")
801 .log_err();
802 ControlFlow::Break(())
803 } else {
804 ControlFlow::Continue(())
805 }
806 }
807
808 fn monitor(
809 this: WeakEntity<Self>,
810 io_task: Task<Result<i32>>,
811 cx: &AsyncApp,
812 ) -> Task<Result<()>> {
813 cx.spawn(async move |cx| {
814 let result = io_task.await;
815
816 match result {
817 Ok(exit_code) => {
818 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
819 match error {
820 ProxyLaunchError::ServerNotRunning => {
821 log::error!("failed to reconnect because server is not running");
822 this.update(cx, |this, cx| {
823 this.set_state(State::ServerNotRunning, cx);
824 })?;
825 }
826 }
827 } else if exit_code > 0 {
828 log::error!("proxy process terminated unexpectedly");
829 this.update(cx, |this, cx| {
830 this.reconnect(cx).ok();
831 })?;
832 }
833 }
834 Err(error) => {
835 log::warn!(
836 "remote io task died with error: {:?}. reconnecting...",
837 error
838 );
839 this.update(cx, |this, cx| {
840 this.reconnect(cx).ok();
841 })?;
842 }
843 }
844
845 Ok(())
846 })
847 }
848
849 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
850 self.state.as_ref().is_some_and(check)
851 }
852
853 fn try_set_state(&mut self, cx: &mut Context<Self>, map: impl FnOnce(&State) -> Option<State>) {
854 let new_state = self.state.as_ref().and_then(map);
855 if let Some(new_state) = new_state {
856 self.state.replace(new_state);
857 cx.notify();
858 }
859 }
860
861 fn set_state(&mut self, state: State, cx: &mut Context<Self>) {
862 log::info!("setting state to '{}'", &state);
863
864 let is_reconnect_exhausted = state.is_reconnect_exhausted();
865 let is_server_not_running = state.is_server_not_running();
866 self.state.replace(state);
867
868 if is_reconnect_exhausted || is_server_not_running {
869 cx.emit(RemoteClientEvent::Disconnected);
870 }
871 cx.notify();
872 }
873
874 pub fn shell(&self) -> Option<String> {
875 Some(self.remote_connection()?.shell())
876 }
877
878 pub fn default_system_shell(&self) -> Option<String> {
879 Some(self.remote_connection()?.default_system_shell())
880 }
881
882 pub fn shares_network_interface(&self) -> bool {
883 self.remote_connection()
884 .map_or(false, |connection| connection.shares_network_interface())
885 }
886
887 pub fn build_command(
888 &self,
889 program: Option<String>,
890 args: &[String],
891 env: &HashMap<String, String>,
892 working_dir: Option<String>,
893 port_forward: Option<(u16, String, u16)>,
894 ) -> Result<CommandTemplate> {
895 let Some(connection) = self.remote_connection() else {
896 return Err(anyhow!("no remote connection"));
897 };
898 connection.build_command(program, args, env, working_dir, port_forward)
899 }
900
901 pub fn build_forward_ports_command(
902 &self,
903 forwards: Vec<(u16, String, u16)>,
904 ) -> Result<CommandTemplate> {
905 let Some(connection) = self.remote_connection() else {
906 return Err(anyhow!("no remote connection"));
907 };
908 connection.build_forward_ports_command(forwards)
909 }
910
911 pub fn upload_directory(
912 &self,
913 src_path: PathBuf,
914 dest_path: RemotePathBuf,
915 cx: &App,
916 ) -> Task<Result<()>> {
917 let Some(connection) = self.remote_connection() else {
918 return Task::ready(Err(anyhow!("no remote connection")));
919 };
920 connection.upload_directory(src_path, dest_path, cx)
921 }
922
923 pub fn proto_client(&self) -> AnyProtoClient {
924 self.client.clone().into()
925 }
926
927 pub fn connection_options(&self) -> RemoteConnectionOptions {
928 self.connection_options.clone()
929 }
930
931 pub fn connection(&self) -> Option<Arc<dyn RemoteConnection>> {
932 if let State::Connected {
933 remote_connection, ..
934 } = self.state.as_ref()?
935 {
936 Some(remote_connection.clone())
937 } else {
938 None
939 }
940 }
941
942 pub fn connection_state(&self) -> ConnectionState {
943 self.state
944 .as_ref()
945 .map(ConnectionState::from)
946 .unwrap_or(ConnectionState::Disconnected)
947 }
948
949 pub fn is_disconnected(&self) -> bool {
950 self.connection_state() == ConnectionState::Disconnected
951 }
952
953 pub fn path_style(&self) -> PathStyle {
954 self.path_style
955 }
956
957 #[cfg(any(test, feature = "test-support"))]
958 pub fn simulate_disconnect(&self, client_cx: &mut App) -> Task<()> {
959 let opts = self.connection_options();
960 client_cx.spawn(async move |cx| {
961 let connection = cx.update_global(|c: &mut ConnectionPool, _| {
962 if let Some(ConnectionPoolEntry::Connected(c)) = c.connections.get(&opts) {
963 if let Some(connection) = c.upgrade() {
964 connection
965 } else {
966 panic!("connection was dropped")
967 }
968 } else {
969 panic!("missing test connection")
970 }
971 });
972
973 connection.simulate_disconnect(cx);
974 })
975 }
976
977 /// Creates a mock connection pair for testing.
978 ///
979 /// This is the recommended way to create mock remote connections for tests.
980 /// It returns both the `MockConnectionOptions` (which can be passed to create
981 /// a `HeadlessProject`) and an `AnyProtoClient` for the server side.
982 ///
983 /// # Example
984 /// ```ignore
985 /// let (opts, server_session) = RemoteClient::fake_server(cx, server_cx);
986 /// // Set up HeadlessProject with server_session...
987 /// let client = RemoteClient::fake_client(opts, cx).await;
988 /// ```
989 #[cfg(any(test, feature = "test-support"))]
990 pub fn fake_server(
991 client_cx: &mut gpui::TestAppContext,
992 server_cx: &mut gpui::TestAppContext,
993 ) -> (RemoteConnectionOptions, AnyProtoClient) {
994 use crate::transport::mock::MockConnection;
995 let (opts, server_client) = MockConnection::new(client_cx, server_cx);
996 (opts.into(), server_client)
997 }
998
999 /// Creates a `RemoteClient` connected to a mock server.
1000 ///
1001 /// Call `fake_server` first to get the connection options, set up the
1002 /// `HeadlessProject` with the server session, then call this method
1003 /// to create the client.
1004 #[cfg(any(test, feature = "test-support"))]
1005 pub async fn fake_client(
1006 opts: RemoteConnectionOptions,
1007 client_cx: &mut gpui::TestAppContext,
1008 ) -> Entity<Self> {
1009 use crate::transport::mock::MockDelegate;
1010 let (_tx, rx) = oneshot::channel();
1011 let mut cx = client_cx.to_async();
1012 let connection = connect(opts, Arc::new(MockDelegate), &mut cx)
1013 .await
1014 .unwrap();
1015 client_cx
1016 .update(|cx| {
1017 Self::new(
1018 ConnectionIdentifier::setup(),
1019 connection,
1020 rx,
1021 Arc::new(MockDelegate),
1022 cx,
1023 )
1024 })
1025 .await
1026 .unwrap()
1027 .unwrap()
1028 }
1029
1030 fn remote_connection(&self) -> Option<Arc<dyn RemoteConnection>> {
1031 self.state
1032 .as_ref()
1033 .and_then(|state| state.remote_connection())
1034 }
1035}
1036
1037enum ConnectionPoolEntry {
1038 Connecting(Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>>),
1039 Connected(Weak<dyn RemoteConnection>),
1040}
1041
1042#[derive(Default)]
1043struct ConnectionPool {
1044 connections: HashMap<RemoteConnectionOptions, ConnectionPoolEntry>,
1045}
1046
1047impl Global for ConnectionPool {}
1048
1049impl ConnectionPool {
1050 pub fn connect(
1051 &mut self,
1052 opts: RemoteConnectionOptions,
1053 delegate: Arc<dyn RemoteClientDelegate>,
1054 cx: &mut App,
1055 ) -> Shared<Task<Result<Arc<dyn RemoteConnection>, Arc<anyhow::Error>>>> {
1056 let connection = self.connections.get(&opts);
1057 match connection {
1058 Some(ConnectionPoolEntry::Connecting(task)) => {
1059 delegate.set_status(
1060 Some("Waiting for existing connection attempt"),
1061 &mut cx.to_async(),
1062 );
1063 return task.clone();
1064 }
1065 Some(ConnectionPoolEntry::Connected(remote)) => {
1066 if let Some(remote) = remote.upgrade()
1067 && !remote.has_been_killed()
1068 {
1069 return Task::ready(Ok(remote)).shared();
1070 }
1071 self.connections.remove(&opts);
1072 }
1073 None => {}
1074 }
1075
1076 let task = cx
1077 .spawn({
1078 let opts = opts.clone();
1079 let delegate = delegate.clone();
1080 async move |cx| {
1081 let connection = match opts.clone() {
1082 RemoteConnectionOptions::Ssh(opts) => {
1083 SshRemoteConnection::new(opts, delegate, cx)
1084 .await
1085 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1086 }
1087 RemoteConnectionOptions::Wsl(opts) => {
1088 WslRemoteConnection::new(opts, delegate, cx)
1089 .await
1090 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1091 }
1092 RemoteConnectionOptions::Docker(opts) => {
1093 DockerExecConnection::new(opts, delegate, cx)
1094 .await
1095 .map(|connection| Arc::new(connection) as Arc<dyn RemoteConnection>)
1096 }
1097 #[cfg(any(test, feature = "test-support"))]
1098 RemoteConnectionOptions::Mock(opts) => {
1099 cx.update(|cx| {
1100 cx.default_global::<crate::transport::mock::MockConnectionRegistry>()
1101 .take(&opts)
1102 .ok_or_else(|| anyhow!(
1103 "Mock connection not found. Call MockConnection::new() first."
1104 ))
1105 .map(|connection| connection as Arc<dyn RemoteConnection>)
1106 })
1107 }
1108 };
1109
1110 cx.update_global(|pool: &mut Self, _| {
1111 debug_assert!(matches!(
1112 pool.connections.get(&opts),
1113 Some(ConnectionPoolEntry::Connecting(_))
1114 ));
1115 match connection {
1116 Ok(connection) => {
1117 pool.connections.insert(
1118 opts.clone(),
1119 ConnectionPoolEntry::Connected(Arc::downgrade(&connection)),
1120 );
1121 Ok(connection)
1122 }
1123 Err(error) => {
1124 pool.connections.remove(&opts);
1125 Err(Arc::new(error))
1126 }
1127 }
1128 })
1129 }
1130 })
1131 .shared();
1132
1133 self.connections
1134 .insert(opts.clone(), ConnectionPoolEntry::Connecting(task.clone()));
1135 task
1136 }
1137}
1138
1139#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1140pub enum RemoteConnectionOptions {
1141 Ssh(SshConnectionOptions),
1142 Wsl(WslConnectionOptions),
1143 Docker(DockerConnectionOptions),
1144 #[cfg(any(test, feature = "test-support"))]
1145 Mock(crate::transport::mock::MockConnectionOptions),
1146}
1147
1148impl RemoteConnectionOptions {
1149 pub fn display_name(&self) -> String {
1150 match self {
1151 RemoteConnectionOptions::Ssh(opts) => opts.host.to_string(),
1152 RemoteConnectionOptions::Wsl(opts) => opts.distro_name.clone(),
1153 RemoteConnectionOptions::Docker(opts) => opts.name.clone(),
1154 #[cfg(any(test, feature = "test-support"))]
1155 RemoteConnectionOptions::Mock(opts) => format!("mock-{}", opts.id),
1156 }
1157 }
1158}
1159
1160impl From<SshConnectionOptions> for RemoteConnectionOptions {
1161 fn from(opts: SshConnectionOptions) -> Self {
1162 RemoteConnectionOptions::Ssh(opts)
1163 }
1164}
1165
1166impl From<WslConnectionOptions> for RemoteConnectionOptions {
1167 fn from(opts: WslConnectionOptions) -> Self {
1168 RemoteConnectionOptions::Wsl(opts)
1169 }
1170}
1171
1172#[cfg(any(test, feature = "test-support"))]
1173impl From<crate::transport::mock::MockConnectionOptions> for RemoteConnectionOptions {
1174 fn from(opts: crate::transport::mock::MockConnectionOptions) -> Self {
1175 RemoteConnectionOptions::Mock(opts)
1176 }
1177}
1178
1179#[cfg(target_os = "windows")]
1180/// Open a wsl path (\\wsl.localhost\<distro>\path)
1181#[derive(Debug, Clone, PartialEq, Eq, gpui::Action)]
1182#[action(namespace = workspace, no_json, no_register)]
1183pub struct OpenWslPath {
1184 pub distro: WslConnectionOptions,
1185 pub paths: Vec<PathBuf>,
1186}
1187
1188#[async_trait(?Send)]
1189pub trait RemoteConnection: Send + Sync {
1190 fn start_proxy(
1191 &self,
1192 unique_identifier: String,
1193 reconnect: bool,
1194 incoming_tx: UnboundedSender<Envelope>,
1195 outgoing_rx: UnboundedReceiver<Envelope>,
1196 connection_activity_tx: Sender<()>,
1197 delegate: Arc<dyn RemoteClientDelegate>,
1198 cx: &mut AsyncApp,
1199 ) -> Task<Result<i32>>;
1200 fn upload_directory(
1201 &self,
1202 src_path: PathBuf,
1203 dest_path: RemotePathBuf,
1204 cx: &App,
1205 ) -> Task<Result<()>>;
1206 async fn kill(&self) -> Result<()>;
1207 fn has_been_killed(&self) -> bool;
1208 fn shares_network_interface(&self) -> bool {
1209 false
1210 }
1211 fn build_command(
1212 &self,
1213 program: Option<String>,
1214 args: &[String],
1215 env: &HashMap<String, String>,
1216 working_dir: Option<String>,
1217 port_forward: Option<(u16, String, u16)>,
1218 ) -> Result<CommandTemplate>;
1219 fn build_forward_ports_command(
1220 &self,
1221 forwards: Vec<(u16, String, u16)>,
1222 ) -> Result<CommandTemplate>;
1223 fn connection_options(&self) -> RemoteConnectionOptions;
1224 fn path_style(&self) -> PathStyle;
1225 fn shell(&self) -> String;
1226 fn default_system_shell(&self) -> String;
1227 fn has_wsl_interop(&self) -> bool;
1228
1229 #[cfg(any(test, feature = "test-support"))]
1230 fn simulate_disconnect(&self, _: &AsyncApp) {}
1231}
1232
1233type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1234
1235struct Signal<T> {
1236 tx: Mutex<Option<oneshot::Sender<T>>>,
1237 rx: Shared<Task<Option<T>>>,
1238}
1239
1240impl<T: Send + Clone + 'static> Signal<T> {
1241 pub fn new(cx: &App) -> Self {
1242 let (tx, rx) = oneshot::channel();
1243
1244 let task = cx
1245 .background_executor()
1246 .spawn(async move { rx.await.ok() })
1247 .shared();
1248
1249 Self {
1250 tx: Mutex::new(Some(tx)),
1251 rx: task,
1252 }
1253 }
1254
1255 fn set(&self, value: T) {
1256 if let Some(tx) = self.tx.lock().take() {
1257 let _ = tx.send(value);
1258 }
1259 }
1260
1261 fn wait(&self) -> Shared<Task<Option<T>>> {
1262 self.rx.clone()
1263 }
1264}
1265
1266pub(crate) struct ChannelClient {
1267 next_message_id: AtomicU32,
1268 outgoing_tx: Mutex<mpsc::UnboundedSender<Envelope>>,
1269 buffer: Mutex<VecDeque<Envelope>>,
1270 response_channels: ResponseChannels,
1271 message_handlers: Mutex<ProtoMessageHandlerSet>,
1272 max_received: AtomicU32,
1273 name: &'static str,
1274 task: Mutex<Task<Result<()>>>,
1275 remote_started: Signal<()>,
1276 has_wsl_interop: bool,
1277}
1278
1279impl ChannelClient {
1280 pub(crate) fn new(
1281 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1282 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1283 cx: &App,
1284 name: &'static str,
1285 has_wsl_interop: bool,
1286 ) -> Arc<Self> {
1287 Arc::new_cyclic(|this| Self {
1288 outgoing_tx: Mutex::new(outgoing_tx),
1289 next_message_id: AtomicU32::new(0),
1290 max_received: AtomicU32::new(0),
1291 response_channels: ResponseChannels::default(),
1292 message_handlers: Default::default(),
1293 buffer: Mutex::new(VecDeque::new()),
1294 name,
1295 task: Mutex::new(Self::start_handling_messages(
1296 this.clone(),
1297 incoming_rx,
1298 &cx.to_async(),
1299 )),
1300 remote_started: Signal::new(cx),
1301 has_wsl_interop,
1302 })
1303 }
1304
1305 fn wait_for_remote_started(&self) -> Shared<Task<Option<()>>> {
1306 self.remote_started.wait()
1307 }
1308
1309 fn start_handling_messages(
1310 this: Weak<Self>,
1311 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1312 cx: &AsyncApp,
1313 ) -> Task<Result<()>> {
1314 cx.spawn(async move |cx| {
1315 if let Some(this) = this.upgrade() {
1316 let envelope = proto::RemoteStarted {}.into_envelope(0, None, None);
1317 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1318 };
1319
1320 let peer_id = PeerId { owner_id: 0, id: 0 };
1321 while let Some(incoming) = incoming_rx.next().await {
1322 let Some(this) = this.upgrade() else {
1323 return anyhow::Ok(());
1324 };
1325 if let Some(ack_id) = incoming.ack_id {
1326 let mut buffer = this.buffer.lock();
1327 while buffer.front().is_some_and(|msg| msg.id <= ack_id) {
1328 buffer.pop_front();
1329 }
1330 }
1331 if let Some(proto::envelope::Payload::FlushBufferedMessages(_)) = &incoming.payload
1332 {
1333 log::debug!(
1334 "{}:remote message received. name:FlushBufferedMessages",
1335 this.name
1336 );
1337 {
1338 let buffer = this.buffer.lock();
1339 for envelope in buffer.iter() {
1340 this.outgoing_tx
1341 .lock()
1342 .unbounded_send(envelope.clone())
1343 .ok();
1344 }
1345 }
1346 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1347 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1348 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1349 continue;
1350 }
1351
1352 if let Some(proto::envelope::Payload::RemoteStarted(_)) = &incoming.payload {
1353 this.remote_started.set(());
1354 let mut envelope = proto::Ack {}.into_envelope(0, Some(incoming.id), None);
1355 envelope.id = this.next_message_id.fetch_add(1, SeqCst);
1356 this.outgoing_tx.lock().unbounded_send(envelope).ok();
1357 continue;
1358 }
1359
1360 this.max_received.store(incoming.id, SeqCst);
1361
1362 if let Some(request_id) = incoming.responding_to {
1363 let request_id = MessageId(request_id);
1364 let sender = this.response_channels.lock().remove(&request_id);
1365 if let Some(sender) = sender {
1366 let (tx, rx) = oneshot::channel();
1367 if incoming.payload.is_some() {
1368 sender.send((incoming, tx)).ok();
1369 }
1370 rx.await.ok();
1371 }
1372 } else if let Some(envelope) =
1373 build_typed_envelope(peer_id, Instant::now(), incoming)
1374 {
1375 let type_name = envelope.payload_type_name();
1376 let message_id = envelope.message_id();
1377 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1378 &this.message_handlers,
1379 envelope,
1380 this.clone().into(),
1381 cx.clone(),
1382 ) {
1383 log::debug!("{}:remote message received. name:{type_name}", this.name);
1384 cx.foreground_executor()
1385 .spawn(async move {
1386 match future.await {
1387 Ok(_) => {
1388 log::debug!(
1389 "{}:remote message handled. name:{type_name}",
1390 this.name
1391 );
1392 }
1393 Err(error) => {
1394 log::error!(
1395 "{}:error handling message. type:{}, error:{:#}",
1396 this.name,
1397 type_name,
1398 format!("{error:#}").lines().fold(
1399 String::new(),
1400 |mut message, line| {
1401 if !message.is_empty() {
1402 message.push(' ');
1403 }
1404 message.push_str(line);
1405 message
1406 }
1407 )
1408 );
1409 }
1410 }
1411 })
1412 .detach()
1413 } else {
1414 log::error!("{}:unhandled remote message name:{type_name}", this.name);
1415 if let Err(e) = AnyProtoClient::from(this.clone()).send_response(
1416 message_id,
1417 anyhow::anyhow!("no handler registered for {type_name}").to_proto(),
1418 ) {
1419 log::error!(
1420 "{}:error sending error response for {type_name}:{e:#}",
1421 this.name
1422 );
1423 }
1424 }
1425 }
1426 }
1427 anyhow::Ok(())
1428 })
1429 }
1430
1431 pub(crate) fn reconnect(
1432 self: &Arc<Self>,
1433 incoming_rx: UnboundedReceiver<Envelope>,
1434 outgoing_tx: UnboundedSender<Envelope>,
1435 cx: &AsyncApp,
1436 ) {
1437 *self.outgoing_tx.lock() = outgoing_tx;
1438 *self.task.lock() = Self::start_handling_messages(Arc::downgrade(self), incoming_rx, cx);
1439 }
1440
1441 fn request<T: RequestMessage>(
1442 &self,
1443 payload: T,
1444 ) -> impl 'static + Future<Output = Result<T::Response>> {
1445 self.request_internal(payload, true)
1446 }
1447
1448 fn request_internal<T: RequestMessage>(
1449 &self,
1450 payload: T,
1451 use_buffer: bool,
1452 ) -> impl 'static + Future<Output = Result<T::Response>> {
1453 log::debug!("remote request start. name:{}", T::NAME);
1454 let response =
1455 self.request_dynamic(payload.into_envelope(0, None, None), T::NAME, use_buffer);
1456 async move {
1457 let response = response.await?;
1458 log::debug!("remote request finish. name:{}", T::NAME);
1459 T::Response::from_envelope(response).context("received a response of the wrong type")
1460 }
1461 }
1462
1463 async fn resync(&self, timeout: Duration) -> Result<()> {
1464 smol::future::or(
1465 async {
1466 self.request_internal(proto::FlushBufferedMessages {}, false)
1467 .await?;
1468
1469 for envelope in self.buffer.lock().iter() {
1470 self.outgoing_tx
1471 .lock()
1472 .unbounded_send(envelope.clone())
1473 .ok();
1474 }
1475 Ok(())
1476 },
1477 async {
1478 smol::Timer::after(timeout).await;
1479 anyhow::bail!("Timed out resyncing remote client")
1480 },
1481 )
1482 .await
1483 }
1484
1485 async fn ping(&self, timeout: Duration) -> Result<()> {
1486 smol::future::or(
1487 async {
1488 self.request(proto::Ping {}).await?;
1489 Ok(())
1490 },
1491 async {
1492 smol::Timer::after(timeout).await;
1493 anyhow::bail!("Timed out pinging remote client")
1494 },
1495 )
1496 .await
1497 }
1498
1499 fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1500 log::debug!("remote send name:{}", T::NAME);
1501 self.send_dynamic(payload.into_envelope(0, None, None))
1502 }
1503
1504 fn request_dynamic(
1505 &self,
1506 mut envelope: proto::Envelope,
1507 type_name: &'static str,
1508 use_buffer: bool,
1509 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1510 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1511 let (tx, rx) = oneshot::channel();
1512 let mut response_channels_lock = self.response_channels.lock();
1513 response_channels_lock.insert(MessageId(envelope.id), tx);
1514 drop(response_channels_lock);
1515
1516 let result = if use_buffer {
1517 self.send_buffered(envelope)
1518 } else {
1519 self.send_unbuffered(envelope)
1520 };
1521 async move {
1522 if let Err(error) = &result {
1523 log::error!("failed to send message: {error}");
1524 anyhow::bail!("failed to send message: {error}");
1525 }
1526
1527 let response = rx.await.context("connection lost")?.0;
1528 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1529 return Err(RpcError::from_proto(error, type_name));
1530 }
1531 Ok(response)
1532 }
1533 }
1534
1535 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1536 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1537 self.send_buffered(envelope)
1538 }
1539
1540 fn send_buffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1541 envelope.ack_id = Some(self.max_received.load(SeqCst));
1542 self.buffer.lock().push_back(envelope.clone());
1543 // ignore errors on send (happen while we're reconnecting)
1544 // assume that the global "disconnected" overlay is sufficient.
1545 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1546 Ok(())
1547 }
1548
1549 fn send_unbuffered(&self, mut envelope: proto::Envelope) -> Result<()> {
1550 envelope.ack_id = Some(self.max_received.load(SeqCst));
1551 self.outgoing_tx.lock().unbounded_send(envelope).ok();
1552 Ok(())
1553 }
1554}
1555
1556impl ProtoClient for ChannelClient {
1557 fn request(
1558 &self,
1559 envelope: proto::Envelope,
1560 request_type: &'static str,
1561 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1562 self.request_dynamic(envelope, request_type, true).boxed()
1563 }
1564
1565 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1566 self.send_dynamic(envelope)
1567 }
1568
1569 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1570 self.send_dynamic(envelope)
1571 }
1572
1573 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1574 &self.message_handlers
1575 }
1576
1577 fn is_via_collab(&self) -> bool {
1578 false
1579 }
1580
1581 fn has_wsl_interop(&self) -> bool {
1582 self.has_wsl_interop
1583 }
1584}