1use crate::{
2 json_log::LogRecord,
3 protocol::{
4 message_len_from_buffer, read_message_with_len, write_message, MessageId, MESSAGE_LEN_SIZE,
5 },
6 proxy::ProxyLaunchError,
7};
8use anyhow::{anyhow, Context as _, Result};
9use collections::HashMap;
10use futures::{
11 channel::{
12 mpsc::{self, UnboundedReceiver, UnboundedSender},
13 oneshot,
14 },
15 future::BoxFuture,
16 select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, SinkExt,
17 StreamExt as _,
18};
19use gpui::{
20 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SemanticVersion, Task,
21 WeakModel,
22};
23use parking_lot::Mutex;
24use rpc::{
25 proto::{self, build_typed_envelope, Envelope, EnvelopedMessage, PeerId, RequestMessage},
26 AnyProtoClient, EntityMessageSubscriber, ProtoClient, ProtoMessageHandlerSet, RpcError,
27};
28use smol::{
29 fs,
30 process::{self, Child, Stdio},
31 Timer,
32};
33use std::{
34 any::TypeId,
35 ffi::OsStr,
36 fmt,
37 ops::ControlFlow,
38 path::{Path, PathBuf},
39 sync::{
40 atomic::{AtomicU32, Ordering::SeqCst},
41 Arc,
42 },
43 time::{Duration, Instant},
44};
45use tempfile::TempDir;
46use util::ResultExt;
47
48#[derive(
49 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
50)]
51pub struct SshProjectId(pub u64);
52
53#[derive(Clone)]
54pub struct SshSocket {
55 connection_options: SshConnectionOptions,
56 socket_path: PathBuf,
57}
58
59#[derive(Debug, Default, Clone, PartialEq, Eq)]
60pub struct SshConnectionOptions {
61 pub host: String,
62 pub username: Option<String>,
63 pub port: Option<u16>,
64 pub password: Option<String>,
65}
66
67impl SshConnectionOptions {
68 pub fn ssh_url(&self) -> String {
69 let mut result = String::from("ssh://");
70 if let Some(username) = &self.username {
71 result.push_str(username);
72 result.push('@');
73 }
74 result.push_str(&self.host);
75 if let Some(port) = self.port {
76 result.push(':');
77 result.push_str(&port.to_string());
78 }
79 result
80 }
81
82 fn scp_url(&self) -> String {
83 if let Some(username) = &self.username {
84 format!("{}@{}", username, self.host)
85 } else {
86 self.host.clone()
87 }
88 }
89
90 pub fn connection_string(&self) -> String {
91 let host = if let Some(username) = &self.username {
92 format!("{}@{}", username, self.host)
93 } else {
94 self.host.clone()
95 };
96 if let Some(port) = &self.port {
97 format!("{}:{}", host, port)
98 } else {
99 host
100 }
101 }
102
103 // Uniquely identifies dev server projects on a remote host. Needs to be
104 // stable for the same dev server project.
105 pub fn dev_server_identifier(&self) -> String {
106 let mut identifier = format!("dev-server-{:?}", self.host);
107 if let Some(username) = self.username.as_ref() {
108 identifier.push('-');
109 identifier.push_str(&username);
110 }
111 identifier
112 }
113}
114
115#[derive(Copy, Clone, Debug)]
116pub struct SshPlatform {
117 pub os: &'static str,
118 pub arch: &'static str,
119}
120
121impl SshPlatform {
122 pub fn triple(&self) -> Option<String> {
123 Some(format!(
124 "{}-{}",
125 self.arch,
126 match self.os {
127 "linux" => "unknown-linux-gnu",
128 "macos" => "apple-darwin",
129 _ => return None,
130 }
131 ))
132 }
133}
134
135pub trait SshClientDelegate: Send + Sync {
136 fn ask_password(
137 &self,
138 prompt: String,
139 cx: &mut AsyncAppContext,
140 ) -> oneshot::Receiver<Result<String>>;
141 fn remote_server_binary_path(&self, cx: &mut AsyncAppContext) -> Result<PathBuf>;
142 fn get_server_binary(
143 &self,
144 platform: SshPlatform,
145 cx: &mut AsyncAppContext,
146 ) -> oneshot::Receiver<Result<(PathBuf, SemanticVersion)>>;
147 fn set_status(&self, status: Option<&str>, cx: &mut AsyncAppContext);
148 fn set_error(&self, error_message: String, cx: &mut AsyncAppContext);
149}
150
151impl SshSocket {
152 fn ssh_command<S: AsRef<OsStr>>(&self, program: S) -> process::Command {
153 let mut command = process::Command::new("ssh");
154 self.ssh_options(&mut command)
155 .arg(self.connection_options.ssh_url())
156 .arg(program);
157 command
158 }
159
160 fn ssh_options<'a>(&self, command: &'a mut process::Command) -> &'a mut process::Command {
161 command
162 .stdin(Stdio::piped())
163 .stdout(Stdio::piped())
164 .stderr(Stdio::piped())
165 .args(["-o", "ControlMaster=no", "-o"])
166 .arg(format!("ControlPath={}", self.socket_path.display()))
167 }
168
169 fn ssh_args(&self) -> Vec<String> {
170 vec![
171 "-o".to_string(),
172 "ControlMaster=no".to_string(),
173 "-o".to_string(),
174 format!("ControlPath={}", self.socket_path.display()),
175 self.connection_options.ssh_url(),
176 ]
177 }
178}
179
180async fn run_cmd(command: &mut process::Command) -> Result<String> {
181 let output = command.output().await?;
182 if output.status.success() {
183 Ok(String::from_utf8_lossy(&output.stdout).to_string())
184 } else {
185 Err(anyhow!(
186 "failed to run command: {}",
187 String::from_utf8_lossy(&output.stderr)
188 ))
189 }
190}
191
192struct ChannelForwarder {
193 quit_tx: UnboundedSender<()>,
194 forwarding_task: Task<(UnboundedSender<Envelope>, UnboundedReceiver<Envelope>)>,
195}
196
197impl ChannelForwarder {
198 fn new(
199 mut incoming_tx: UnboundedSender<Envelope>,
200 mut outgoing_rx: UnboundedReceiver<Envelope>,
201 cx: &AsyncAppContext,
202 ) -> (Self, UnboundedSender<Envelope>, UnboundedReceiver<Envelope>) {
203 let (quit_tx, mut quit_rx) = mpsc::unbounded::<()>();
204
205 let (proxy_incoming_tx, mut proxy_incoming_rx) = mpsc::unbounded::<Envelope>();
206 let (mut proxy_outgoing_tx, proxy_outgoing_rx) = mpsc::unbounded::<Envelope>();
207
208 let forwarding_task = cx.background_executor().spawn(async move {
209 loop {
210 select_biased! {
211 _ = quit_rx.next().fuse() => {
212 break;
213 },
214 incoming_envelope = proxy_incoming_rx.next().fuse() => {
215 if let Some(envelope) = incoming_envelope {
216 if incoming_tx.send(envelope).await.is_err() {
217 break;
218 }
219 } else {
220 break;
221 }
222 }
223 outgoing_envelope = outgoing_rx.next().fuse() => {
224 if let Some(envelope) = outgoing_envelope {
225 if proxy_outgoing_tx.send(envelope).await.is_err() {
226 break;
227 }
228 } else {
229 break;
230 }
231 }
232 }
233 }
234
235 (incoming_tx, outgoing_rx)
236 });
237
238 (
239 Self {
240 forwarding_task,
241 quit_tx,
242 },
243 proxy_incoming_tx,
244 proxy_outgoing_rx,
245 )
246 }
247
248 async fn into_channels(mut self) -> (UnboundedSender<Envelope>, UnboundedReceiver<Envelope>) {
249 let _ = self.quit_tx.send(()).await;
250 self.forwarding_task.await
251 }
252}
253
254const MAX_MISSED_HEARTBEATS: usize = 5;
255const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
256const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
257
258const MAX_RECONNECT_ATTEMPTS: usize = 3;
259
260enum State {
261 Connecting,
262 Connected {
263 ssh_connection: SshRemoteConnection,
264 delegate: Arc<dyn SshClientDelegate>,
265 forwarder: ChannelForwarder,
266
267 multiplex_task: Task<Result<()>>,
268 heartbeat_task: Task<Result<()>>,
269 },
270 HeartbeatMissed {
271 missed_heartbeats: usize,
272
273 ssh_connection: SshRemoteConnection,
274 delegate: Arc<dyn SshClientDelegate>,
275 forwarder: ChannelForwarder,
276
277 multiplex_task: Task<Result<()>>,
278 heartbeat_task: Task<Result<()>>,
279 },
280 Reconnecting,
281 ReconnectFailed {
282 ssh_connection: SshRemoteConnection,
283 delegate: Arc<dyn SshClientDelegate>,
284 forwarder: ChannelForwarder,
285
286 error: anyhow::Error,
287 attempts: usize,
288 },
289 ReconnectExhausted,
290 ServerNotRunning,
291}
292
293impl fmt::Display for State {
294 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295 match self {
296 Self::Connecting => write!(f, "connecting"),
297 Self::Connected { .. } => write!(f, "connected"),
298 Self::Reconnecting => write!(f, "reconnecting"),
299 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
300 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
301 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
302 Self::ServerNotRunning { .. } => write!(f, "server not running"),
303 }
304 }
305}
306
307impl State {
308 fn ssh_connection(&self) -> Option<&SshRemoteConnection> {
309 match self {
310 Self::Connected { ssh_connection, .. } => Some(ssh_connection),
311 Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection),
312 Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection),
313 _ => None,
314 }
315 }
316
317 fn can_reconnect(&self) -> bool {
318 match self {
319 Self::Connected { .. }
320 | Self::HeartbeatMissed { .. }
321 | Self::ReconnectFailed { .. } => true,
322 State::Connecting
323 | State::Reconnecting
324 | State::ReconnectExhausted
325 | State::ServerNotRunning => false,
326 }
327 }
328
329 fn is_reconnect_failed(&self) -> bool {
330 matches!(self, Self::ReconnectFailed { .. })
331 }
332
333 fn is_reconnect_exhausted(&self) -> bool {
334 matches!(self, Self::ReconnectExhausted { .. })
335 }
336
337 fn is_reconnecting(&self) -> bool {
338 matches!(self, Self::Reconnecting { .. })
339 }
340
341 fn heartbeat_recovered(self) -> Self {
342 match self {
343 Self::HeartbeatMissed {
344 ssh_connection,
345 delegate,
346 forwarder,
347 multiplex_task,
348 heartbeat_task,
349 ..
350 } => Self::Connected {
351 ssh_connection,
352 delegate,
353 forwarder,
354 multiplex_task,
355 heartbeat_task,
356 },
357 _ => self,
358 }
359 }
360
361 fn heartbeat_missed(self) -> Self {
362 match self {
363 Self::Connected {
364 ssh_connection,
365 delegate,
366 forwarder,
367 multiplex_task,
368 heartbeat_task,
369 } => Self::HeartbeatMissed {
370 missed_heartbeats: 1,
371 ssh_connection,
372 delegate,
373 forwarder,
374 multiplex_task,
375 heartbeat_task,
376 },
377 Self::HeartbeatMissed {
378 missed_heartbeats,
379 ssh_connection,
380 delegate,
381 forwarder,
382 multiplex_task,
383 heartbeat_task,
384 } => Self::HeartbeatMissed {
385 missed_heartbeats: missed_heartbeats + 1,
386 ssh_connection,
387 delegate,
388 forwarder,
389 multiplex_task,
390 heartbeat_task,
391 },
392 _ => self,
393 }
394 }
395}
396
397/// The state of the ssh connection.
398#[derive(Clone, Copy, Debug, PartialEq, Eq)]
399pub enum ConnectionState {
400 Connecting,
401 Connected,
402 HeartbeatMissed,
403 Reconnecting,
404 Disconnected,
405}
406
407impl From<&State> for ConnectionState {
408 fn from(value: &State) -> Self {
409 match value {
410 State::Connecting => Self::Connecting,
411 State::Connected { .. } => Self::Connected,
412 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
413 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
414 State::ReconnectExhausted => Self::Disconnected,
415 State::ServerNotRunning => Self::Disconnected,
416 }
417 }
418}
419
420pub struct SshRemoteClient {
421 client: Arc<ChannelClient>,
422 unique_identifier: String,
423 connection_options: SshConnectionOptions,
424 state: Arc<Mutex<Option<State>>>,
425}
426
427#[derive(Debug)]
428pub enum SshRemoteEvent {
429 Disconnected,
430}
431
432impl EventEmitter<SshRemoteEvent> for SshRemoteClient {}
433
434impl SshRemoteClient {
435 pub fn new(
436 unique_identifier: String,
437 connection_options: SshConnectionOptions,
438 delegate: Arc<dyn SshClientDelegate>,
439 cx: &AppContext,
440 ) -> Task<Result<Model<Self>>> {
441 cx.spawn(|mut cx| async move {
442 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
443 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
444
445 let client = cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx))?;
446 let this = cx.new_model(|_| Self {
447 client: client.clone(),
448 unique_identifier: unique_identifier.clone(),
449 connection_options: connection_options.clone(),
450 state: Arc::new(Mutex::new(Some(State::Connecting))),
451 })?;
452
453 let (proxy, proxy_incoming_tx, proxy_outgoing_rx) =
454 ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx);
455
456 let (ssh_connection, ssh_proxy_process) = Self::establish_connection(
457 unique_identifier,
458 false,
459 connection_options,
460 delegate.clone(),
461 &mut cx,
462 )
463 .await?;
464
465 let multiplex_task = Self::multiplex(
466 this.downgrade(),
467 ssh_proxy_process,
468 proxy_incoming_tx,
469 proxy_outgoing_rx,
470 &mut cx,
471 );
472
473 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
474 log::error!("failed to establish connection: {}", error);
475 delegate.set_error(error.to_string(), &mut cx);
476 return Err(error);
477 }
478
479 let heartbeat_task = Self::heartbeat(this.downgrade(), &mut cx);
480
481 this.update(&mut cx, |this, _| {
482 *this.state.lock() = Some(State::Connected {
483 ssh_connection,
484 delegate,
485 forwarder: proxy,
486 multiplex_task,
487 heartbeat_task,
488 });
489 })?;
490
491 Ok(this)
492 })
493 }
494
495 pub fn shutdown_processes<T: RequestMessage>(
496 &self,
497 shutdown_request: Option<T>,
498 ) -> Option<impl Future<Output = ()>> {
499 let state = self.state.lock().take()?;
500 log::info!("shutting down ssh processes");
501
502 let State::Connected {
503 multiplex_task,
504 heartbeat_task,
505 ssh_connection,
506 delegate,
507 forwarder,
508 } = state
509 else {
510 return None;
511 };
512
513 let client = self.client.clone();
514
515 Some(async move {
516 if let Some(shutdown_request) = shutdown_request {
517 client.send(shutdown_request).log_err();
518 // We wait 50ms instead of waiting for a response, because
519 // waiting for a response would require us to wait on the main thread
520 // which we want to avoid in an `on_app_quit` callback.
521 Timer::after(Duration::from_millis(50)).await;
522 }
523
524 // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
525 // child of master_process.
526 drop(multiplex_task);
527 // Now drop the rest of state, which kills master process.
528 drop(heartbeat_task);
529 drop(ssh_connection);
530 drop(delegate);
531 drop(forwarder);
532 })
533 }
534
535 fn reconnect(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
536 let mut lock = self.state.lock();
537
538 let can_reconnect = lock
539 .as_ref()
540 .map(|state| state.can_reconnect())
541 .unwrap_or(false);
542 if !can_reconnect {
543 let error = if let Some(state) = lock.as_ref() {
544 format!("invalid state, cannot reconnect while in state {state}")
545 } else {
546 "no state set".to_string()
547 };
548 log::info!("aborting reconnect, because not in state that allows reconnecting");
549 return Err(anyhow!(error));
550 }
551
552 let state = lock.take().unwrap();
553 let (attempts, mut ssh_connection, delegate, forwarder) = match state {
554 State::Connected {
555 ssh_connection,
556 delegate,
557 forwarder,
558 multiplex_task,
559 heartbeat_task,
560 }
561 | State::HeartbeatMissed {
562 ssh_connection,
563 delegate,
564 forwarder,
565 multiplex_task,
566 heartbeat_task,
567 ..
568 } => {
569 drop(multiplex_task);
570 drop(heartbeat_task);
571 (0, ssh_connection, delegate, forwarder)
572 }
573 State::ReconnectFailed {
574 attempts,
575 ssh_connection,
576 delegate,
577 forwarder,
578 ..
579 } => (attempts, ssh_connection, delegate, forwarder),
580 State::Connecting
581 | State::Reconnecting
582 | State::ReconnectExhausted
583 | State::ServerNotRunning => unreachable!(),
584 };
585
586 let attempts = attempts + 1;
587 if attempts > MAX_RECONNECT_ATTEMPTS {
588 log::error!(
589 "Failed to reconnect to after {} attempts, giving up",
590 MAX_RECONNECT_ATTEMPTS
591 );
592 drop(lock);
593 self.set_state(State::ReconnectExhausted, cx);
594 return Ok(());
595 }
596 drop(lock);
597
598 self.set_state(State::Reconnecting, cx);
599
600 log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
601
602 let identifier = self.unique_identifier.clone();
603 let client = self.client.clone();
604 let reconnect_task = cx.spawn(|this, mut cx| async move {
605 macro_rules! failed {
606 ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr, $forwarder:expr) => {
607 return State::ReconnectFailed {
608 error: anyhow!($error),
609 attempts: $attempts,
610 ssh_connection: $ssh_connection,
611 delegate: $delegate,
612 forwarder: $forwarder,
613 };
614 };
615 }
616
617 if let Err(error) = ssh_connection.master_process.kill() {
618 failed!(error, attempts, ssh_connection, delegate, forwarder);
619 };
620
621 if let Err(error) = ssh_connection
622 .master_process
623 .status()
624 .await
625 .context("Failed to kill ssh process")
626 {
627 failed!(error, attempts, ssh_connection, delegate, forwarder);
628 }
629
630 let connection_options = ssh_connection.socket.connection_options.clone();
631
632 let (incoming_tx, outgoing_rx) = forwarder.into_channels().await;
633 let (forwarder, proxy_incoming_tx, proxy_outgoing_rx) =
634 ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx);
635
636 let (ssh_connection, ssh_process) = match Self::establish_connection(
637 identifier,
638 true,
639 connection_options,
640 delegate.clone(),
641 &mut cx,
642 )
643 .await
644 {
645 Ok((ssh_connection, ssh_process)) => (ssh_connection, ssh_process),
646 Err(error) => {
647 failed!(error, attempts, ssh_connection, delegate, forwarder);
648 }
649 };
650
651 let multiplex_task = Self::multiplex(
652 this.clone(),
653 ssh_process,
654 proxy_incoming_tx,
655 proxy_outgoing_rx,
656 &mut cx,
657 );
658
659 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
660 failed!(error, attempts, ssh_connection, delegate, forwarder);
661 };
662
663 State::Connected {
664 ssh_connection,
665 delegate,
666 forwarder,
667 multiplex_task,
668 heartbeat_task: Self::heartbeat(this.clone(), &mut cx),
669 }
670 });
671
672 cx.spawn(|this, mut cx| async move {
673 let new_state = reconnect_task.await;
674 this.update(&mut cx, |this, cx| {
675 this.try_set_state(cx, |old_state| {
676 if old_state.is_reconnecting() {
677 match &new_state {
678 State::Connecting
679 | State::Reconnecting { .. }
680 | State::HeartbeatMissed { .. }
681 | State::ServerNotRunning => {}
682 State::Connected { .. } => {
683 log::info!("Successfully reconnected");
684 }
685 State::ReconnectFailed {
686 error, attempts, ..
687 } => {
688 log::error!(
689 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
690 attempts,
691 error
692 );
693 }
694 State::ReconnectExhausted => {
695 log::error!("Reconnect attempt failed and all attempts exhausted");
696 }
697 }
698 Some(new_state)
699 } else {
700 None
701 }
702 });
703
704 if this.state_is(State::is_reconnect_failed) {
705 this.reconnect(cx)
706 } else if this.state_is(State::is_reconnect_exhausted) {
707 cx.emit(SshRemoteEvent::Disconnected);
708 Ok(())
709 } else {
710 log::debug!("State has transition from Reconnecting into new state while attempting reconnect. Ignoring new state.");
711 Ok(())
712 }
713 })
714 })
715 .detach_and_log_err(cx);
716
717 Ok(())
718 }
719
720 fn heartbeat(this: WeakModel<Self>, cx: &mut AsyncAppContext) -> Task<Result<()>> {
721 let Ok(client) = this.update(cx, |this, _| this.client.clone()) else {
722 return Task::ready(Err(anyhow!("SshRemoteClient lost")));
723 };
724 cx.spawn(|mut cx| {
725 let this = this.clone();
726 async move {
727 let mut missed_heartbeats = 0;
728
729 let mut timer = Timer::interval(HEARTBEAT_INTERVAL);
730 loop {
731 timer.next().await;
732
733 log::debug!("Sending heartbeat to server...");
734
735 let result = client.ping(HEARTBEAT_TIMEOUT).await;
736 if result.is_err() {
737 missed_heartbeats += 1;
738 log::warn!(
739 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
740 HEARTBEAT_TIMEOUT,
741 missed_heartbeats,
742 MAX_MISSED_HEARTBEATS
743 );
744 } else if missed_heartbeats != 0 {
745 missed_heartbeats = 0;
746 } else {
747 continue;
748 }
749
750 let result = this.update(&mut cx, |this, mut cx| {
751 this.handle_heartbeat_result(missed_heartbeats, &mut cx)
752 })?;
753 if result.is_break() {
754 return Ok(());
755 }
756 }
757 }
758 })
759 }
760
761 fn handle_heartbeat_result(
762 &mut self,
763 missed_heartbeats: usize,
764 cx: &mut ModelContext<Self>,
765 ) -> ControlFlow<()> {
766 let state = self.state.lock().take().unwrap();
767 let next_state = if missed_heartbeats > 0 {
768 state.heartbeat_missed()
769 } else {
770 state.heartbeat_recovered()
771 };
772
773 self.set_state(next_state, cx);
774
775 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
776 log::error!(
777 "Missed last {} heartbeats. Reconnecting...",
778 missed_heartbeats
779 );
780
781 self.reconnect(cx)
782 .context("failed to start reconnect process after missing heartbeats")
783 .log_err();
784 ControlFlow::Break(())
785 } else {
786 ControlFlow::Continue(())
787 }
788 }
789
790 fn multiplex(
791 this: WeakModel<Self>,
792 mut ssh_proxy_process: Child,
793 incoming_tx: UnboundedSender<Envelope>,
794 mut outgoing_rx: UnboundedReceiver<Envelope>,
795 cx: &AsyncAppContext,
796 ) -> Task<Result<()>> {
797 let mut child_stderr = ssh_proxy_process.stderr.take().unwrap();
798 let mut child_stdout = ssh_proxy_process.stdout.take().unwrap();
799 let mut child_stdin = ssh_proxy_process.stdin.take().unwrap();
800
801 let io_task = cx.background_executor().spawn(async move {
802 let mut stdin_buffer = Vec::new();
803 let mut stdout_buffer = Vec::new();
804 let mut stderr_buffer = Vec::new();
805 let mut stderr_offset = 0;
806
807 loop {
808 stdout_buffer.resize(MESSAGE_LEN_SIZE, 0);
809 stderr_buffer.resize(stderr_offset + 1024, 0);
810
811 select_biased! {
812 outgoing = outgoing_rx.next().fuse() => {
813 let Some(outgoing) = outgoing else {
814 return anyhow::Ok(None);
815 };
816
817 write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
818 }
819
820 result = child_stdout.read(&mut stdout_buffer).fuse() => {
821 match result {
822 Ok(0) => {
823 child_stdin.close().await?;
824 outgoing_rx.close();
825 let status = ssh_proxy_process.status().await?;
826 return Ok(status.code());
827 }
828 Ok(len) => {
829 if len < stdout_buffer.len() {
830 child_stdout.read_exact(&mut stdout_buffer[len..]).await?;
831 }
832
833 let message_len = message_len_from_buffer(&stdout_buffer);
834 match read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len).await {
835 Ok(envelope) => {
836 incoming_tx.unbounded_send(envelope).ok();
837 }
838 Err(error) => {
839 log::error!("error decoding message {error:?}");
840 }
841 }
842 }
843 Err(error) => {
844 Err(anyhow!("error reading stdout: {error:?}"))?;
845 }
846 }
847 }
848
849 result = child_stderr.read(&mut stderr_buffer[stderr_offset..]).fuse() => {
850 match result {
851 Ok(len) => {
852 stderr_offset += len;
853 let mut start_ix = 0;
854 while let Some(ix) = stderr_buffer[start_ix..stderr_offset].iter().position(|b| b == &b'\n') {
855 let line_ix = start_ix + ix;
856 let content = &stderr_buffer[start_ix..line_ix];
857 start_ix = line_ix + 1;
858 if let Ok(record) = serde_json::from_slice::<LogRecord>(content) {
859 record.log(log::logger())
860 } else {
861 eprintln!("(remote) {}", String::from_utf8_lossy(content));
862 }
863 }
864 stderr_buffer.drain(0..start_ix);
865 stderr_offset -= start_ix;
866 }
867 Err(error) => {
868 Err(anyhow!("error reading stderr: {error:?}"))?;
869 }
870 }
871 }
872 }
873 }
874 });
875
876 cx.spawn(|mut cx| async move {
877 let result = io_task.await;
878
879 match result {
880 Ok(Some(exit_code)) => {
881 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
882 match error {
883 ProxyLaunchError::ServerNotRunning => {
884 log::error!("failed to reconnect because server is not running");
885 this.update(&mut cx, |this, cx| {
886 this.set_state(State::ServerNotRunning, cx);
887 cx.emit(SshRemoteEvent::Disconnected);
888 })?;
889 }
890 }
891 } else if exit_code > 0 {
892 log::error!("proxy process terminated unexpectedly");
893 this.update(&mut cx, |this, cx| {
894 this.reconnect(cx).ok();
895 })?;
896 }
897 }
898 Ok(None) => {}
899 Err(error) => {
900 log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
901 this.update(&mut cx, |this, cx| {
902 this.reconnect(cx).ok();
903 })?;
904 }
905 }
906 Ok(())
907 })
908 }
909
910 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
911 self.state.lock().as_ref().map_or(false, check)
912 }
913
914 fn try_set_state(
915 &self,
916 cx: &mut ModelContext<Self>,
917 map: impl FnOnce(&State) -> Option<State>,
918 ) {
919 let mut lock = self.state.lock();
920 let new_state = lock.as_ref().and_then(map);
921
922 if let Some(new_state) = new_state {
923 lock.replace(new_state);
924 cx.notify();
925 }
926 }
927
928 fn set_state(&self, state: State, cx: &mut ModelContext<Self>) {
929 log::info!("setting state to '{}'", &state);
930 self.state.lock().replace(state);
931 cx.notify();
932 }
933
934 async fn establish_connection(
935 unique_identifier: String,
936 reconnect: bool,
937 connection_options: SshConnectionOptions,
938 delegate: Arc<dyn SshClientDelegate>,
939 cx: &mut AsyncAppContext,
940 ) -> Result<(SshRemoteConnection, Child)> {
941 let ssh_connection =
942 SshRemoteConnection::new(connection_options, delegate.clone(), cx).await?;
943
944 let platform = ssh_connection.query_platform().await?;
945 let (local_binary_path, version) = delegate.get_server_binary(platform, cx).await??;
946 let remote_binary_path = delegate.remote_server_binary_path(cx)?;
947 ssh_connection
948 .ensure_server_binary(
949 &delegate,
950 &local_binary_path,
951 &remote_binary_path,
952 version,
953 cx,
954 )
955 .await?;
956
957 let socket = ssh_connection.socket.clone();
958 run_cmd(socket.ssh_command(&remote_binary_path).arg("version")).await?;
959
960 delegate.set_status(Some("Starting proxy"), cx);
961
962 let mut start_proxy_command = format!(
963 "RUST_LOG={} RUST_BACKTRACE={} {:?} proxy --identifier {}",
964 std::env::var("RUST_LOG").unwrap_or_default(),
965 std::env::var("RUST_BACKTRACE").unwrap_or_default(),
966 remote_binary_path,
967 unique_identifier,
968 );
969 if reconnect {
970 start_proxy_command.push_str(" --reconnect");
971 }
972
973 let ssh_proxy_process = socket
974 .ssh_command(start_proxy_command)
975 // IMPORTANT: we kill this process when we drop the task that uses it.
976 .kill_on_drop(true)
977 .spawn()
978 .context("failed to spawn remote server")?;
979
980 Ok((ssh_connection, ssh_proxy_process))
981 }
982
983 pub fn subscribe_to_entity<E: 'static>(&self, remote_id: u64, entity: &Model<E>) {
984 self.client.subscribe_to_entity(remote_id, entity);
985 }
986
987 pub fn ssh_args(&self) -> Option<Vec<String>> {
988 self.state
989 .lock()
990 .as_ref()
991 .and_then(|state| state.ssh_connection())
992 .map(|ssh_connection| ssh_connection.socket.ssh_args())
993 }
994
995 pub fn to_proto_client(&self) -> AnyProtoClient {
996 self.client.clone().into()
997 }
998
999 pub fn connection_string(&self) -> String {
1000 self.connection_options.connection_string()
1001 }
1002
1003 pub fn connection_options(&self) -> SshConnectionOptions {
1004 self.connection_options.clone()
1005 }
1006
1007 #[cfg(not(any(test, feature = "test-support")))]
1008 pub fn connection_state(&self) -> ConnectionState {
1009 self.state
1010 .lock()
1011 .as_ref()
1012 .map(ConnectionState::from)
1013 .unwrap_or(ConnectionState::Disconnected)
1014 }
1015
1016 #[cfg(any(test, feature = "test-support"))]
1017 pub fn connection_state(&self) -> ConnectionState {
1018 ConnectionState::Connected
1019 }
1020
1021 pub fn is_disconnected(&self) -> bool {
1022 self.connection_state() == ConnectionState::Disconnected
1023 }
1024
1025 #[cfg(any(test, feature = "test-support"))]
1026 pub fn fake(
1027 client_cx: &mut gpui::TestAppContext,
1028 server_cx: &mut gpui::TestAppContext,
1029 ) -> (Model<Self>, Arc<ChannelClient>) {
1030 use gpui::Context;
1031
1032 let (server_to_client_tx, server_to_client_rx) = mpsc::unbounded();
1033 let (client_to_server_tx, client_to_server_rx) = mpsc::unbounded();
1034
1035 (
1036 client_cx.update(|cx| {
1037 let client = ChannelClient::new(server_to_client_rx, client_to_server_tx, cx);
1038 cx.new_model(|_| Self {
1039 client,
1040 unique_identifier: "fake".to_string(),
1041 connection_options: SshConnectionOptions::default(),
1042 state: Arc::new(Mutex::new(None)),
1043 })
1044 }),
1045 server_cx.update(|cx| ChannelClient::new(client_to_server_rx, server_to_client_tx, cx)),
1046 )
1047 }
1048}
1049
1050impl From<SshRemoteClient> for AnyProtoClient {
1051 fn from(client: SshRemoteClient) -> Self {
1052 AnyProtoClient::new(client.client.clone())
1053 }
1054}
1055
1056struct SshRemoteConnection {
1057 socket: SshSocket,
1058 master_process: process::Child,
1059 _temp_dir: TempDir,
1060}
1061
1062impl Drop for SshRemoteConnection {
1063 fn drop(&mut self) {
1064 if let Err(error) = self.master_process.kill() {
1065 log::error!("failed to kill SSH master process: {}", error);
1066 }
1067 }
1068}
1069
1070impl SshRemoteConnection {
1071 #[cfg(not(unix))]
1072 async fn new(
1073 _connection_options: SshConnectionOptions,
1074 _delegate: Arc<dyn SshClientDelegate>,
1075 _cx: &mut AsyncAppContext,
1076 ) -> Result<Self> {
1077 Err(anyhow!("ssh is not supported on this platform"))
1078 }
1079
1080 #[cfg(unix)]
1081 async fn new(
1082 connection_options: SshConnectionOptions,
1083 delegate: Arc<dyn SshClientDelegate>,
1084 cx: &mut AsyncAppContext,
1085 ) -> Result<Self> {
1086 use futures::{io::BufReader, AsyncBufReadExt as _};
1087 use smol::{fs::unix::PermissionsExt as _, net::unix::UnixListener};
1088 use util::ResultExt as _;
1089
1090 delegate.set_status(Some("connecting"), cx);
1091
1092 let url = connection_options.ssh_url();
1093 let temp_dir = tempfile::Builder::new()
1094 .prefix("zed-ssh-session")
1095 .tempdir()?;
1096
1097 // Create a domain socket listener to handle requests from the askpass program.
1098 let askpass_socket = temp_dir.path().join("askpass.sock");
1099 let (askpass_opened_tx, askpass_opened_rx) = oneshot::channel::<()>();
1100 let listener =
1101 UnixListener::bind(&askpass_socket).context("failed to create askpass socket")?;
1102
1103 let askpass_task = cx.spawn({
1104 let delegate = delegate.clone();
1105 |mut cx| async move {
1106 let mut askpass_opened_tx = Some(askpass_opened_tx);
1107
1108 while let Ok((mut stream, _)) = listener.accept().await {
1109 if let Some(askpass_opened_tx) = askpass_opened_tx.take() {
1110 askpass_opened_tx.send(()).ok();
1111 }
1112 let mut buffer = Vec::new();
1113 let mut reader = BufReader::new(&mut stream);
1114 if reader.read_until(b'\0', &mut buffer).await.is_err() {
1115 buffer.clear();
1116 }
1117 let password_prompt = String::from_utf8_lossy(&buffer);
1118 if let Some(password) = delegate
1119 .ask_password(password_prompt.to_string(), &mut cx)
1120 .await
1121 .context("failed to get ssh password")
1122 .and_then(|p| p)
1123 .log_err()
1124 {
1125 stream.write_all(password.as_bytes()).await.log_err();
1126 }
1127 }
1128 }
1129 });
1130
1131 // Create an askpass script that communicates back to this process.
1132 let askpass_script = format!(
1133 "{shebang}\n{print_args} | nc -U {askpass_socket} 2> /dev/null \n",
1134 askpass_socket = askpass_socket.display(),
1135 print_args = "printf '%s\\0' \"$@\"",
1136 shebang = "#!/bin/sh",
1137 );
1138 let askpass_script_path = temp_dir.path().join("askpass.sh");
1139 fs::write(&askpass_script_path, askpass_script).await?;
1140 fs::set_permissions(&askpass_script_path, std::fs::Permissions::from_mode(0o755)).await?;
1141
1142 // Start the master SSH process, which does not do anything except for establish
1143 // the connection and keep it open, allowing other ssh commands to reuse it
1144 // via a control socket.
1145 let socket_path = temp_dir.path().join("ssh.sock");
1146 let mut master_process = process::Command::new("ssh")
1147 .stdin(Stdio::null())
1148 .stdout(Stdio::piped())
1149 .stderr(Stdio::piped())
1150 .env("SSH_ASKPASS_REQUIRE", "force")
1151 .env("SSH_ASKPASS", &askpass_script_path)
1152 .args([
1153 "-N",
1154 "-o",
1155 "ControlPersist=no",
1156 "-o",
1157 "ControlMaster=yes",
1158 "-o",
1159 ])
1160 .arg(format!("ControlPath={}", socket_path.display()))
1161 .arg(&url)
1162 .spawn()?;
1163
1164 // Wait for this ssh process to close its stdout, indicating that authentication
1165 // has completed.
1166 let stdout = master_process.stdout.as_mut().unwrap();
1167 let mut output = Vec::new();
1168 let connection_timeout = Duration::from_secs(10);
1169
1170 let result = select_biased! {
1171 _ = askpass_opened_rx.fuse() => {
1172 // If the askpass script has opened, that means the user is typing
1173 // their password, in which case we don't want to timeout anymore,
1174 // since we know a connection has been established.
1175 stdout.read_to_end(&mut output).await?;
1176 Ok(())
1177 }
1178 result = stdout.read_to_end(&mut output).fuse() => {
1179 result?;
1180 Ok(())
1181 }
1182 _ = futures::FutureExt::fuse(smol::Timer::after(connection_timeout)) => {
1183 Err(anyhow!("Exceeded {:?} timeout trying to connect to host", connection_timeout))
1184 }
1185 };
1186
1187 if let Err(e) = result {
1188 let error_message = format!("Failed to connect to host: {}.", e);
1189 delegate.set_error(error_message, cx);
1190 return Err(e);
1191 }
1192
1193 drop(askpass_task);
1194
1195 if master_process.try_status()?.is_some() {
1196 output.clear();
1197 let mut stderr = master_process.stderr.take().unwrap();
1198 stderr.read_to_end(&mut output).await?;
1199
1200 let error_message = format!("failed to connect: {}", String::from_utf8_lossy(&output));
1201 delegate.set_error(error_message.clone(), cx);
1202 Err(anyhow!(error_message))?;
1203 }
1204
1205 Ok(Self {
1206 socket: SshSocket {
1207 connection_options,
1208 socket_path,
1209 },
1210 master_process,
1211 _temp_dir: temp_dir,
1212 })
1213 }
1214
1215 async fn ensure_server_binary(
1216 &self,
1217 delegate: &Arc<dyn SshClientDelegate>,
1218 src_path: &Path,
1219 dst_path: &Path,
1220 version: SemanticVersion,
1221 cx: &mut AsyncAppContext,
1222 ) -> Result<()> {
1223 let mut dst_path_gz = dst_path.to_path_buf();
1224 dst_path_gz.set_extension("gz");
1225
1226 if let Some(parent) = dst_path.parent() {
1227 run_cmd(self.socket.ssh_command("mkdir").arg("-p").arg(parent)).await?;
1228 }
1229
1230 let mut server_binary_exists = false;
1231 if cfg!(not(debug_assertions)) {
1232 if let Ok(installed_version) =
1233 run_cmd(self.socket.ssh_command(dst_path).arg("version")).await
1234 {
1235 if installed_version.trim() == version.to_string() {
1236 server_binary_exists = true;
1237 }
1238 }
1239 }
1240
1241 if server_binary_exists {
1242 log::info!("remote development server already present",);
1243 return Ok(());
1244 }
1245
1246 let src_stat = fs::metadata(src_path).await?;
1247 let size = src_stat.len();
1248 let server_mode = 0o755;
1249
1250 let t0 = Instant::now();
1251 delegate.set_status(Some("uploading remote development server"), cx);
1252 log::info!("uploading remote development server ({}kb)", size / 1024);
1253 self.upload_file(src_path, &dst_path_gz)
1254 .await
1255 .context("failed to upload server binary")?;
1256 log::info!("uploaded remote development server in {:?}", t0.elapsed());
1257
1258 delegate.set_status(Some("extracting remote development server"), cx);
1259 run_cmd(
1260 self.socket
1261 .ssh_command("gunzip")
1262 .arg("--force")
1263 .arg(&dst_path_gz),
1264 )
1265 .await?;
1266
1267 delegate.set_status(Some("unzipping remote development server"), cx);
1268 run_cmd(
1269 self.socket
1270 .ssh_command("chmod")
1271 .arg(format!("{:o}", server_mode))
1272 .arg(dst_path),
1273 )
1274 .await?;
1275
1276 Ok(())
1277 }
1278
1279 async fn query_platform(&self) -> Result<SshPlatform> {
1280 let os = run_cmd(self.socket.ssh_command("uname").arg("-s")).await?;
1281 let arch = run_cmd(self.socket.ssh_command("uname").arg("-m")).await?;
1282
1283 let os = match os.trim() {
1284 "Darwin" => "macos",
1285 "Linux" => "linux",
1286 _ => Err(anyhow!("unknown uname os {os:?}"))?,
1287 };
1288 let arch = if arch.starts_with("arm") || arch.starts_with("aarch64") {
1289 "aarch64"
1290 } else if arch.starts_with("x86") || arch.starts_with("i686") {
1291 "x86_64"
1292 } else {
1293 Err(anyhow!("unknown uname architecture {arch:?}"))?
1294 };
1295
1296 Ok(SshPlatform { os, arch })
1297 }
1298
1299 async fn upload_file(&self, src_path: &Path, dest_path: &Path) -> Result<()> {
1300 let mut command = process::Command::new("scp");
1301 let output = self
1302 .socket
1303 .ssh_options(&mut command)
1304 .args(
1305 self.socket
1306 .connection_options
1307 .port
1308 .map(|port| vec!["-P".to_string(), port.to_string()])
1309 .unwrap_or_default(),
1310 )
1311 .arg(src_path)
1312 .arg(format!(
1313 "{}:{}",
1314 self.socket.connection_options.scp_url(),
1315 dest_path.display()
1316 ))
1317 .output()
1318 .await?;
1319
1320 if output.status.success() {
1321 Ok(())
1322 } else {
1323 Err(anyhow!(
1324 "failed to upload file {} -> {}: {}",
1325 src_path.display(),
1326 dest_path.display(),
1327 String::from_utf8_lossy(&output.stderr)
1328 ))
1329 }
1330 }
1331}
1332
1333type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1334
1335pub struct ChannelClient {
1336 next_message_id: AtomicU32,
1337 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1338 response_channels: ResponseChannels, // Lock
1339 message_handlers: Mutex<ProtoMessageHandlerSet>, // Lock
1340}
1341
1342impl ChannelClient {
1343 pub fn new(
1344 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1345 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1346 cx: &AppContext,
1347 ) -> Arc<Self> {
1348 let this = Arc::new(Self {
1349 outgoing_tx,
1350 next_message_id: AtomicU32::new(0),
1351 response_channels: ResponseChannels::default(),
1352 message_handlers: Default::default(),
1353 });
1354
1355 Self::start_handling_messages(this.clone(), incoming_rx, cx);
1356
1357 this
1358 }
1359
1360 fn start_handling_messages(
1361 this: Arc<Self>,
1362 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1363 cx: &AppContext,
1364 ) {
1365 cx.spawn(|cx| {
1366 let this = Arc::downgrade(&this);
1367 async move {
1368 let peer_id = PeerId { owner_id: 0, id: 0 };
1369 while let Some(incoming) = incoming_rx.next().await {
1370 let Some(this) = this.upgrade() else {
1371 return anyhow::Ok(());
1372 };
1373
1374 if let Some(request_id) = incoming.responding_to {
1375 let request_id = MessageId(request_id);
1376 let sender = this.response_channels.lock().remove(&request_id);
1377 if let Some(sender) = sender {
1378 let (tx, rx) = oneshot::channel();
1379 if incoming.payload.is_some() {
1380 sender.send((incoming, tx)).ok();
1381 }
1382 rx.await.ok();
1383 }
1384 } else if let Some(envelope) =
1385 build_typed_envelope(peer_id, Instant::now(), incoming)
1386 {
1387 let type_name = envelope.payload_type_name();
1388 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1389 &this.message_handlers,
1390 envelope,
1391 this.clone().into(),
1392 cx.clone(),
1393 ) {
1394 log::debug!("ssh message received. name:{type_name}");
1395 match future.await {
1396 Ok(_) => {
1397 log::debug!("ssh message handled. name:{type_name}");
1398 }
1399 Err(error) => {
1400 log::error!(
1401 "error handling message. type:{type_name}, error:{error}",
1402 );
1403 }
1404 }
1405 } else {
1406 log::error!("unhandled ssh message name:{type_name}");
1407 }
1408 }
1409 }
1410 anyhow::Ok(())
1411 }
1412 })
1413 .detach();
1414 }
1415
1416 pub fn subscribe_to_entity<E: 'static>(&self, remote_id: u64, entity: &Model<E>) {
1417 let id = (TypeId::of::<E>(), remote_id);
1418
1419 let mut message_handlers = self.message_handlers.lock();
1420 if message_handlers
1421 .entities_by_type_and_remote_id
1422 .contains_key(&id)
1423 {
1424 panic!("already subscribed to entity");
1425 }
1426
1427 message_handlers.entities_by_type_and_remote_id.insert(
1428 id,
1429 EntityMessageSubscriber::Entity {
1430 handle: entity.downgrade().into(),
1431 },
1432 );
1433 }
1434
1435 pub fn request<T: RequestMessage>(
1436 &self,
1437 payload: T,
1438 ) -> impl 'static + Future<Output = Result<T::Response>> {
1439 log::debug!("ssh request start. name:{}", T::NAME);
1440 let response = self.request_dynamic(payload.into_envelope(0, None, None), T::NAME);
1441 async move {
1442 let response = response.await?;
1443 log::debug!("ssh request finish. name:{}", T::NAME);
1444 T::Response::from_envelope(response)
1445 .ok_or_else(|| anyhow!("received a response of the wrong type"))
1446 }
1447 }
1448
1449 pub async fn ping(&self, timeout: Duration) -> Result<()> {
1450 smol::future::or(
1451 async {
1452 self.request(proto::Ping {}).await?;
1453 Ok(())
1454 },
1455 async {
1456 smol::Timer::after(timeout).await;
1457 Err(anyhow!("Timeout detected"))
1458 },
1459 )
1460 .await
1461 }
1462
1463 pub fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1464 log::debug!("ssh send name:{}", T::NAME);
1465 self.send_dynamic(payload.into_envelope(0, None, None))
1466 }
1467
1468 pub fn request_dynamic(
1469 &self,
1470 mut envelope: proto::Envelope,
1471 type_name: &'static str,
1472 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1473 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1474 let (tx, rx) = oneshot::channel();
1475 let mut response_channels_lock = self.response_channels.lock();
1476 response_channels_lock.insert(MessageId(envelope.id), tx);
1477 drop(response_channels_lock);
1478 let result = self.outgoing_tx.unbounded_send(envelope);
1479 async move {
1480 if let Err(error) = &result {
1481 log::error!("failed to send message: {}", error);
1482 return Err(anyhow!("failed to send message: {}", error));
1483 }
1484
1485 let response = rx.await.context("connection lost")?.0;
1486 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1487 return Err(RpcError::from_proto(error, type_name));
1488 }
1489 Ok(response)
1490 }
1491 }
1492
1493 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1494 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1495 self.outgoing_tx.unbounded_send(envelope)?;
1496 Ok(())
1497 }
1498}
1499
1500impl ProtoClient for ChannelClient {
1501 fn request(
1502 &self,
1503 envelope: proto::Envelope,
1504 request_type: &'static str,
1505 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1506 self.request_dynamic(envelope, request_type).boxed()
1507 }
1508
1509 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1510 self.send_dynamic(envelope)
1511 }
1512
1513 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1514 self.send_dynamic(envelope)
1515 }
1516
1517 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1518 &self.message_handlers
1519 }
1520
1521 fn is_via_collab(&self) -> bool {
1522 false
1523 }
1524}