1use crate::{
2 json_log::LogRecord,
3 protocol::{
4 message_len_from_buffer, read_message_with_len, write_message, MessageId, MESSAGE_LEN_SIZE,
5 },
6 proxy::ProxyLaunchError,
7};
8use anyhow::{anyhow, Context as _, Result};
9use collections::HashMap;
10use futures::{
11 channel::{
12 mpsc::{self, UnboundedReceiver, UnboundedSender},
13 oneshot,
14 },
15 future::BoxFuture,
16 select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, SinkExt,
17 StreamExt as _,
18};
19use gpui::{
20 AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, SemanticVersion, Task,
21 WeakModel,
22};
23use parking_lot::Mutex;
24use rpc::{
25 proto::{self, build_typed_envelope, Envelope, EnvelopedMessage, PeerId, RequestMessage},
26 AnyProtoClient, EntityMessageSubscriber, ProtoClient, ProtoMessageHandlerSet, RpcError,
27};
28use smol::{
29 fs,
30 process::{self, Child, Stdio},
31 Timer,
32};
33use std::{
34 any::TypeId,
35 ffi::OsStr,
36 fmt,
37 ops::ControlFlow,
38 path::{Path, PathBuf},
39 sync::{
40 atomic::{AtomicU32, Ordering::SeqCst},
41 Arc,
42 },
43 time::{Duration, Instant},
44};
45use tempfile::TempDir;
46use util::ResultExt;
47
48#[derive(
49 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
50)]
51pub struct SshProjectId(pub u64);
52
53#[derive(Clone)]
54pub struct SshSocket {
55 connection_options: SshConnectionOptions,
56 socket_path: PathBuf,
57}
58
59#[derive(Debug, Default, Clone, PartialEq, Eq)]
60pub struct SshConnectionOptions {
61 pub host: String,
62 pub username: Option<String>,
63 pub port: Option<u16>,
64 pub password: Option<String>,
65}
66
67impl SshConnectionOptions {
68 pub fn ssh_url(&self) -> String {
69 let mut result = String::from("ssh://");
70 if let Some(username) = &self.username {
71 result.push_str(username);
72 result.push('@');
73 }
74 result.push_str(&self.host);
75 if let Some(port) = self.port {
76 result.push(':');
77 result.push_str(&port.to_string());
78 }
79 result
80 }
81
82 fn scp_url(&self) -> String {
83 if let Some(username) = &self.username {
84 format!("{}@{}", username, self.host)
85 } else {
86 self.host.clone()
87 }
88 }
89
90 pub fn connection_string(&self) -> String {
91 let host = if let Some(username) = &self.username {
92 format!("{}@{}", username, self.host)
93 } else {
94 self.host.clone()
95 };
96 if let Some(port) = &self.port {
97 format!("{}:{}", host, port)
98 } else {
99 host
100 }
101 }
102
103 // Uniquely identifies dev server projects on a remote host. Needs to be
104 // stable for the same dev server project.
105 pub fn dev_server_identifier(&self) -> String {
106 let mut identifier = format!("dev-server-{:?}", self.host);
107 if let Some(username) = self.username.as_ref() {
108 identifier.push('-');
109 identifier.push_str(&username);
110 }
111 identifier
112 }
113}
114
115#[derive(Copy, Clone, Debug)]
116pub struct SshPlatform {
117 pub os: &'static str,
118 pub arch: &'static str,
119}
120
121impl SshPlatform {
122 pub fn triple(&self) -> Option<String> {
123 Some(format!(
124 "{}-{}",
125 self.arch,
126 match self.os {
127 "linux" => "unknown-linux-gnu",
128 "macos" => "apple-darwin",
129 _ => return None,
130 }
131 ))
132 }
133}
134
135pub trait SshClientDelegate: Send + Sync {
136 fn ask_password(
137 &self,
138 prompt: String,
139 cx: &mut AsyncAppContext,
140 ) -> oneshot::Receiver<Result<String>>;
141 fn remote_server_binary_path(&self, cx: &mut AsyncAppContext) -> Result<PathBuf>;
142 fn get_server_binary(
143 &self,
144 platform: SshPlatform,
145 cx: &mut AsyncAppContext,
146 ) -> oneshot::Receiver<Result<(PathBuf, SemanticVersion)>>;
147 fn set_status(&self, status: Option<&str>, cx: &mut AsyncAppContext);
148 fn set_error(&self, error_message: String, cx: &mut AsyncAppContext);
149}
150
151impl SshSocket {
152 fn ssh_command<S: AsRef<OsStr>>(&self, program: S) -> process::Command {
153 let mut command = process::Command::new("ssh");
154 self.ssh_options(&mut command)
155 .arg(self.connection_options.ssh_url())
156 .arg(program);
157 command
158 }
159
160 fn ssh_options<'a>(&self, command: &'a mut process::Command) -> &'a mut process::Command {
161 command
162 .stdin(Stdio::piped())
163 .stdout(Stdio::piped())
164 .stderr(Stdio::piped())
165 .args(["-o", "ControlMaster=no", "-o"])
166 .arg(format!("ControlPath={}", self.socket_path.display()))
167 }
168
169 fn ssh_args(&self) -> Vec<String> {
170 vec![
171 "-o".to_string(),
172 "ControlMaster=no".to_string(),
173 "-o".to_string(),
174 format!("ControlPath={}", self.socket_path.display()),
175 self.connection_options.ssh_url(),
176 ]
177 }
178}
179
180async fn run_cmd(command: &mut process::Command) -> Result<String> {
181 let output = command.output().await?;
182 if output.status.success() {
183 Ok(String::from_utf8_lossy(&output.stdout).to_string())
184 } else {
185 Err(anyhow!(
186 "failed to run command: {}",
187 String::from_utf8_lossy(&output.stderr)
188 ))
189 }
190}
191
192struct ChannelForwarder {
193 quit_tx: UnboundedSender<()>,
194 forwarding_task: Task<(UnboundedSender<Envelope>, UnboundedReceiver<Envelope>)>,
195}
196
197impl ChannelForwarder {
198 fn new(
199 mut incoming_tx: UnboundedSender<Envelope>,
200 mut outgoing_rx: UnboundedReceiver<Envelope>,
201 cx: &AsyncAppContext,
202 ) -> (Self, UnboundedSender<Envelope>, UnboundedReceiver<Envelope>) {
203 let (quit_tx, mut quit_rx) = mpsc::unbounded::<()>();
204
205 let (proxy_incoming_tx, mut proxy_incoming_rx) = mpsc::unbounded::<Envelope>();
206 let (mut proxy_outgoing_tx, proxy_outgoing_rx) = mpsc::unbounded::<Envelope>();
207
208 let forwarding_task = cx.background_executor().spawn(async move {
209 loop {
210 select_biased! {
211 _ = quit_rx.next().fuse() => {
212 break;
213 },
214 incoming_envelope = proxy_incoming_rx.next().fuse() => {
215 if let Some(envelope) = incoming_envelope {
216 if incoming_tx.send(envelope).await.is_err() {
217 break;
218 }
219 } else {
220 break;
221 }
222 }
223 outgoing_envelope = outgoing_rx.next().fuse() => {
224 if let Some(envelope) = outgoing_envelope {
225 if proxy_outgoing_tx.send(envelope).await.is_err() {
226 break;
227 }
228 } else {
229 break;
230 }
231 }
232 }
233 }
234
235 (incoming_tx, outgoing_rx)
236 });
237
238 (
239 Self {
240 forwarding_task,
241 quit_tx,
242 },
243 proxy_incoming_tx,
244 proxy_outgoing_rx,
245 )
246 }
247
248 async fn into_channels(mut self) -> (UnboundedSender<Envelope>, UnboundedReceiver<Envelope>) {
249 let _ = self.quit_tx.send(()).await;
250 self.forwarding_task.await
251 }
252}
253
254const MAX_MISSED_HEARTBEATS: usize = 5;
255const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
256const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
257
258const MAX_RECONNECT_ATTEMPTS: usize = 3;
259
260enum State {
261 Connecting,
262 Connected {
263 ssh_connection: SshRemoteConnection,
264 delegate: Arc<dyn SshClientDelegate>,
265 forwarder: ChannelForwarder,
266
267 multiplex_task: Task<Result<()>>,
268 heartbeat_task: Task<Result<()>>,
269 },
270 HeartbeatMissed {
271 missed_heartbeats: usize,
272
273 ssh_connection: SshRemoteConnection,
274 delegate: Arc<dyn SshClientDelegate>,
275 forwarder: ChannelForwarder,
276
277 multiplex_task: Task<Result<()>>,
278 heartbeat_task: Task<Result<()>>,
279 },
280 Reconnecting,
281 ReconnectFailed {
282 ssh_connection: SshRemoteConnection,
283 delegate: Arc<dyn SshClientDelegate>,
284 forwarder: ChannelForwarder,
285
286 error: anyhow::Error,
287 attempts: usize,
288 },
289 ReconnectExhausted,
290 ServerNotRunning,
291}
292
293impl fmt::Display for State {
294 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295 match self {
296 Self::Connecting => write!(f, "connecting"),
297 Self::Connected { .. } => write!(f, "connected"),
298 Self::Reconnecting => write!(f, "reconnecting"),
299 Self::ReconnectFailed { .. } => write!(f, "reconnect failed"),
300 Self::ReconnectExhausted => write!(f, "reconnect exhausted"),
301 Self::HeartbeatMissed { .. } => write!(f, "heartbeat missed"),
302 Self::ServerNotRunning { .. } => write!(f, "server not running"),
303 }
304 }
305}
306
307impl State {
308 fn ssh_connection(&self) -> Option<&SshRemoteConnection> {
309 match self {
310 Self::Connected { ssh_connection, .. } => Some(ssh_connection),
311 Self::HeartbeatMissed { ssh_connection, .. } => Some(ssh_connection),
312 Self::ReconnectFailed { ssh_connection, .. } => Some(ssh_connection),
313 _ => None,
314 }
315 }
316
317 fn can_reconnect(&self) -> bool {
318 match self {
319 Self::Connected { .. }
320 | Self::HeartbeatMissed { .. }
321 | Self::ReconnectFailed { .. } => true,
322 State::Connecting
323 | State::Reconnecting
324 | State::ReconnectExhausted
325 | State::ServerNotRunning => false,
326 }
327 }
328
329 fn is_reconnect_failed(&self) -> bool {
330 matches!(self, Self::ReconnectFailed { .. })
331 }
332
333 fn is_reconnect_exhausted(&self) -> bool {
334 matches!(self, Self::ReconnectExhausted { .. })
335 }
336
337 fn is_reconnecting(&self) -> bool {
338 matches!(self, Self::Reconnecting { .. })
339 }
340
341 fn heartbeat_recovered(self) -> Self {
342 match self {
343 Self::HeartbeatMissed {
344 ssh_connection,
345 delegate,
346 forwarder,
347 multiplex_task,
348 heartbeat_task,
349 ..
350 } => Self::Connected {
351 ssh_connection,
352 delegate,
353 forwarder,
354 multiplex_task,
355 heartbeat_task,
356 },
357 _ => self,
358 }
359 }
360
361 fn heartbeat_missed(self) -> Self {
362 match self {
363 Self::Connected {
364 ssh_connection,
365 delegate,
366 forwarder,
367 multiplex_task,
368 heartbeat_task,
369 } => Self::HeartbeatMissed {
370 missed_heartbeats: 1,
371 ssh_connection,
372 delegate,
373 forwarder,
374 multiplex_task,
375 heartbeat_task,
376 },
377 Self::HeartbeatMissed {
378 missed_heartbeats,
379 ssh_connection,
380 delegate,
381 forwarder,
382 multiplex_task,
383 heartbeat_task,
384 } => Self::HeartbeatMissed {
385 missed_heartbeats: missed_heartbeats + 1,
386 ssh_connection,
387 delegate,
388 forwarder,
389 multiplex_task,
390 heartbeat_task,
391 },
392 _ => self,
393 }
394 }
395}
396
397/// The state of the ssh connection.
398#[derive(Clone, Copy, Debug, PartialEq, Eq)]
399pub enum ConnectionState {
400 Connecting,
401 Connected,
402 HeartbeatMissed,
403 Reconnecting,
404 Disconnected,
405}
406
407impl From<&State> for ConnectionState {
408 fn from(value: &State) -> Self {
409 match value {
410 State::Connecting => Self::Connecting,
411 State::Connected { .. } => Self::Connected,
412 State::Reconnecting | State::ReconnectFailed { .. } => Self::Reconnecting,
413 State::HeartbeatMissed { .. } => Self::HeartbeatMissed,
414 State::ReconnectExhausted => Self::Disconnected,
415 State::ServerNotRunning => Self::Disconnected,
416 }
417 }
418}
419
420pub struct SshRemoteClient {
421 client: Arc<ChannelClient>,
422 unique_identifier: String,
423 connection_options: SshConnectionOptions,
424 state: Arc<Mutex<Option<State>>>,
425}
426
427impl Drop for SshRemoteClient {
428 fn drop(&mut self) {
429 self.shutdown_processes();
430 }
431}
432
433#[derive(Debug)]
434pub enum SshRemoteEvent {
435 Disconnected,
436}
437
438impl EventEmitter<SshRemoteEvent> for SshRemoteClient {}
439
440impl SshRemoteClient {
441 pub fn new(
442 unique_identifier: String,
443 connection_options: SshConnectionOptions,
444 delegate: Arc<dyn SshClientDelegate>,
445 cx: &AppContext,
446 ) -> Task<Result<Model<Self>>> {
447 cx.spawn(|mut cx| async move {
448 let (outgoing_tx, outgoing_rx) = mpsc::unbounded::<Envelope>();
449 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
450
451 let client = cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx))?;
452 let this = cx.new_model(|cx| {
453 cx.on_app_quit(|this: &mut Self, _| {
454 this.shutdown_processes();
455 futures::future::ready(())
456 })
457 .detach();
458
459 Self {
460 client: client.clone(),
461 unique_identifier: unique_identifier.clone(),
462 connection_options: connection_options.clone(),
463 state: Arc::new(Mutex::new(Some(State::Connecting))),
464 }
465 })?;
466
467 let (proxy, proxy_incoming_tx, proxy_outgoing_rx) =
468 ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx);
469
470 let (ssh_connection, ssh_proxy_process) = Self::establish_connection(
471 unique_identifier,
472 false,
473 connection_options,
474 delegate.clone(),
475 &mut cx,
476 )
477 .await?;
478
479 let multiplex_task = Self::multiplex(
480 this.downgrade(),
481 ssh_proxy_process,
482 proxy_incoming_tx,
483 proxy_outgoing_rx,
484 &mut cx,
485 );
486
487 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
488 log::error!("failed to establish connection: {}", error);
489 delegate.set_error(error.to_string(), &mut cx);
490 return Err(error);
491 }
492
493 let heartbeat_task = Self::heartbeat(this.downgrade(), &mut cx);
494
495 this.update(&mut cx, |this, _| {
496 *this.state.lock() = Some(State::Connected {
497 ssh_connection,
498 delegate,
499 forwarder: proxy,
500 multiplex_task,
501 heartbeat_task,
502 });
503 })?;
504
505 Ok(this)
506 })
507 }
508
509 fn shutdown_processes(&self) {
510 let Some(state) = self.state.lock().take() else {
511 return;
512 };
513 log::info!("shutting down ssh processes");
514
515 let State::Connected {
516 multiplex_task,
517 heartbeat_task,
518 ..
519 } = state
520 else {
521 return;
522 };
523 // Drop `multiplex_task` because it owns our ssh_proxy_process, which is a
524 // child of master_process.
525 drop(multiplex_task);
526 // Now drop the rest of state, which kills master process.
527 drop(heartbeat_task);
528 }
529
530 fn reconnect(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
531 let mut lock = self.state.lock();
532
533 let can_reconnect = lock
534 .as_ref()
535 .map(|state| state.can_reconnect())
536 .unwrap_or(false);
537 if !can_reconnect {
538 let error = if let Some(state) = lock.as_ref() {
539 format!("invalid state, cannot reconnect while in state {state}")
540 } else {
541 "no state set".to_string()
542 };
543 log::info!("aborting reconnect, because not in state that allows reconnecting");
544 return Err(anyhow!(error));
545 }
546
547 let state = lock.take().unwrap();
548 let (attempts, mut ssh_connection, delegate, forwarder) = match state {
549 State::Connected {
550 ssh_connection,
551 delegate,
552 forwarder,
553 multiplex_task,
554 heartbeat_task,
555 }
556 | State::HeartbeatMissed {
557 ssh_connection,
558 delegate,
559 forwarder,
560 multiplex_task,
561 heartbeat_task,
562 ..
563 } => {
564 drop(multiplex_task);
565 drop(heartbeat_task);
566 (0, ssh_connection, delegate, forwarder)
567 }
568 State::ReconnectFailed {
569 attempts,
570 ssh_connection,
571 delegate,
572 forwarder,
573 ..
574 } => (attempts, ssh_connection, delegate, forwarder),
575 State::Connecting
576 | State::Reconnecting
577 | State::ReconnectExhausted
578 | State::ServerNotRunning => unreachable!(),
579 };
580
581 let attempts = attempts + 1;
582 if attempts > MAX_RECONNECT_ATTEMPTS {
583 log::error!(
584 "Failed to reconnect to after {} attempts, giving up",
585 MAX_RECONNECT_ATTEMPTS
586 );
587 drop(lock);
588 self.set_state(State::ReconnectExhausted, cx);
589 return Ok(());
590 }
591 drop(lock);
592
593 self.set_state(State::Reconnecting, cx);
594
595 log::info!("Trying to reconnect to ssh server... Attempt {}", attempts);
596
597 let identifier = self.unique_identifier.clone();
598 let client = self.client.clone();
599 let reconnect_task = cx.spawn(|this, mut cx| async move {
600 macro_rules! failed {
601 ($error:expr, $attempts:expr, $ssh_connection:expr, $delegate:expr, $forwarder:expr) => {
602 return State::ReconnectFailed {
603 error: anyhow!($error),
604 attempts: $attempts,
605 ssh_connection: $ssh_connection,
606 delegate: $delegate,
607 forwarder: $forwarder,
608 };
609 };
610 }
611
612 if let Err(error) = ssh_connection.master_process.kill() {
613 failed!(error, attempts, ssh_connection, delegate, forwarder);
614 };
615
616 if let Err(error) = ssh_connection
617 .master_process
618 .status()
619 .await
620 .context("Failed to kill ssh process")
621 {
622 failed!(error, attempts, ssh_connection, delegate, forwarder);
623 }
624
625 let connection_options = ssh_connection.socket.connection_options.clone();
626
627 let (incoming_tx, outgoing_rx) = forwarder.into_channels().await;
628 let (forwarder, proxy_incoming_tx, proxy_outgoing_rx) =
629 ChannelForwarder::new(incoming_tx, outgoing_rx, &mut cx);
630
631 let (ssh_connection, ssh_process) = match Self::establish_connection(
632 identifier,
633 true,
634 connection_options,
635 delegate.clone(),
636 &mut cx,
637 )
638 .await
639 {
640 Ok((ssh_connection, ssh_process)) => (ssh_connection, ssh_process),
641 Err(error) => {
642 failed!(error, attempts, ssh_connection, delegate, forwarder);
643 }
644 };
645
646 let multiplex_task = Self::multiplex(
647 this.clone(),
648 ssh_process,
649 proxy_incoming_tx,
650 proxy_outgoing_rx,
651 &mut cx,
652 );
653
654 if let Err(error) = client.ping(HEARTBEAT_TIMEOUT).await {
655 failed!(error, attempts, ssh_connection, delegate, forwarder);
656 };
657
658 State::Connected {
659 ssh_connection,
660 delegate,
661 forwarder,
662 multiplex_task,
663 heartbeat_task: Self::heartbeat(this.clone(), &mut cx),
664 }
665 });
666
667 cx.spawn(|this, mut cx| async move {
668 let new_state = reconnect_task.await;
669 this.update(&mut cx, |this, cx| {
670 this.try_set_state(cx, |old_state| {
671 if old_state.is_reconnecting() {
672 match &new_state {
673 State::Connecting
674 | State::Reconnecting { .. }
675 | State::HeartbeatMissed { .. }
676 | State::ServerNotRunning => {}
677 State::Connected { .. } => {
678 log::info!("Successfully reconnected");
679 }
680 State::ReconnectFailed {
681 error, attempts, ..
682 } => {
683 log::error!(
684 "Reconnect attempt {} failed: {:?}. Starting new attempt...",
685 attempts,
686 error
687 );
688 }
689 State::ReconnectExhausted => {
690 log::error!("Reconnect attempt failed and all attempts exhausted");
691 }
692 }
693 Some(new_state)
694 } else {
695 None
696 }
697 });
698
699 if this.state_is(State::is_reconnect_failed) {
700 this.reconnect(cx)
701 } else if this.state_is(State::is_reconnect_exhausted) {
702 cx.emit(SshRemoteEvent::Disconnected);
703 Ok(())
704 } else {
705 log::debug!("State has transition from Reconnecting into new state while attempting reconnect. Ignoring new state.");
706 Ok(())
707 }
708 })
709 })
710 .detach_and_log_err(cx);
711
712 Ok(())
713 }
714
715 fn heartbeat(this: WeakModel<Self>, cx: &mut AsyncAppContext) -> Task<Result<()>> {
716 let Ok(client) = this.update(cx, |this, _| this.client.clone()) else {
717 return Task::ready(Err(anyhow!("SshRemoteClient lost")));
718 };
719 cx.spawn(|mut cx| {
720 let this = this.clone();
721 async move {
722 let mut missed_heartbeats = 0;
723
724 let mut timer = Timer::interval(HEARTBEAT_INTERVAL);
725 loop {
726 timer.next().await;
727
728 log::debug!("Sending heartbeat to server...");
729
730 let result = client.ping(HEARTBEAT_TIMEOUT).await;
731 if result.is_err() {
732 missed_heartbeats += 1;
733 log::warn!(
734 "No heartbeat from server after {:?}. Missed heartbeat {} out of {}.",
735 HEARTBEAT_TIMEOUT,
736 missed_heartbeats,
737 MAX_MISSED_HEARTBEATS
738 );
739 } else if missed_heartbeats != 0 {
740 missed_heartbeats = 0;
741 } else {
742 continue;
743 }
744
745 let result = this.update(&mut cx, |this, mut cx| {
746 this.handle_heartbeat_result(missed_heartbeats, &mut cx)
747 })?;
748 if result.is_break() {
749 return Ok(());
750 }
751 }
752 }
753 })
754 }
755
756 fn handle_heartbeat_result(
757 &mut self,
758 missed_heartbeats: usize,
759 cx: &mut ModelContext<Self>,
760 ) -> ControlFlow<()> {
761 let state = self.state.lock().take().unwrap();
762 let next_state = if missed_heartbeats > 0 {
763 state.heartbeat_missed()
764 } else {
765 state.heartbeat_recovered()
766 };
767
768 self.set_state(next_state, cx);
769
770 if missed_heartbeats >= MAX_MISSED_HEARTBEATS {
771 log::error!(
772 "Missed last {} heartbeats. Reconnecting...",
773 missed_heartbeats
774 );
775
776 self.reconnect(cx)
777 .context("failed to start reconnect process after missing heartbeats")
778 .log_err();
779 ControlFlow::Break(())
780 } else {
781 ControlFlow::Continue(())
782 }
783 }
784
785 fn multiplex(
786 this: WeakModel<Self>,
787 mut ssh_proxy_process: Child,
788 incoming_tx: UnboundedSender<Envelope>,
789 mut outgoing_rx: UnboundedReceiver<Envelope>,
790 cx: &AsyncAppContext,
791 ) -> Task<Result<()>> {
792 let mut child_stderr = ssh_proxy_process.stderr.take().unwrap();
793 let mut child_stdout = ssh_proxy_process.stdout.take().unwrap();
794 let mut child_stdin = ssh_proxy_process.stdin.take().unwrap();
795
796 let io_task = cx.background_executor().spawn(async move {
797 let mut stdin_buffer = Vec::new();
798 let mut stdout_buffer = Vec::new();
799 let mut stderr_buffer = Vec::new();
800 let mut stderr_offset = 0;
801
802 loop {
803 stdout_buffer.resize(MESSAGE_LEN_SIZE, 0);
804 stderr_buffer.resize(stderr_offset + 1024, 0);
805
806 select_biased! {
807 outgoing = outgoing_rx.next().fuse() => {
808 let Some(outgoing) = outgoing else {
809 return anyhow::Ok(None);
810 };
811
812 write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
813 }
814
815 result = child_stdout.read(&mut stdout_buffer).fuse() => {
816 match result {
817 Ok(0) => {
818 child_stdin.close().await?;
819 outgoing_rx.close();
820 let status = ssh_proxy_process.status().await?;
821 return Ok(status.code());
822 }
823 Ok(len) => {
824 if len < stdout_buffer.len() {
825 child_stdout.read_exact(&mut stdout_buffer[len..]).await?;
826 }
827
828 let message_len = message_len_from_buffer(&stdout_buffer);
829 match read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len).await {
830 Ok(envelope) => {
831 incoming_tx.unbounded_send(envelope).ok();
832 }
833 Err(error) => {
834 log::error!("error decoding message {error:?}");
835 }
836 }
837 }
838 Err(error) => {
839 Err(anyhow!("error reading stdout: {error:?}"))?;
840 }
841 }
842 }
843
844 result = child_stderr.read(&mut stderr_buffer[stderr_offset..]).fuse() => {
845 match result {
846 Ok(len) => {
847 stderr_offset += len;
848 let mut start_ix = 0;
849 while let Some(ix) = stderr_buffer[start_ix..stderr_offset].iter().position(|b| b == &b'\n') {
850 let line_ix = start_ix + ix;
851 let content = &stderr_buffer[start_ix..line_ix];
852 start_ix = line_ix + 1;
853 if let Ok(record) = serde_json::from_slice::<LogRecord>(content) {
854 record.log(log::logger())
855 } else {
856 eprintln!("(remote) {}", String::from_utf8_lossy(content));
857 }
858 }
859 stderr_buffer.drain(0..start_ix);
860 stderr_offset -= start_ix;
861 }
862 Err(error) => {
863 Err(anyhow!("error reading stderr: {error:?}"))?;
864 }
865 }
866 }
867 }
868 }
869 });
870
871 cx.spawn(|mut cx| async move {
872 let result = io_task.await;
873
874 match result {
875 Ok(Some(exit_code)) => {
876 if let Some(error) = ProxyLaunchError::from_exit_code(exit_code) {
877 match error {
878 ProxyLaunchError::ServerNotRunning => {
879 log::error!("failed to reconnect because server is not running");
880 this.update(&mut cx, |this, cx| {
881 this.set_state(State::ServerNotRunning, cx);
882 cx.emit(SshRemoteEvent::Disconnected);
883 })?;
884 }
885 }
886 } else if exit_code > 0 {
887 log::error!("proxy process terminated unexpectedly");
888 this.update(&mut cx, |this, cx| {
889 this.reconnect(cx).ok();
890 })?;
891 }
892 }
893 Ok(None) => {}
894 Err(error) => {
895 log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
896 this.update(&mut cx, |this, cx| {
897 this.reconnect(cx).ok();
898 })?;
899 }
900 }
901 Ok(())
902 })
903 }
904
905 fn state_is(&self, check: impl FnOnce(&State) -> bool) -> bool {
906 self.state.lock().as_ref().map_or(false, check)
907 }
908
909 fn try_set_state(
910 &self,
911 cx: &mut ModelContext<Self>,
912 map: impl FnOnce(&State) -> Option<State>,
913 ) {
914 let mut lock = self.state.lock();
915 let new_state = lock.as_ref().and_then(map);
916
917 if let Some(new_state) = new_state {
918 lock.replace(new_state);
919 cx.notify();
920 }
921 }
922
923 fn set_state(&self, state: State, cx: &mut ModelContext<Self>) {
924 log::info!("setting state to '{}'", &state);
925 self.state.lock().replace(state);
926 cx.notify();
927 }
928
929 async fn establish_connection(
930 unique_identifier: String,
931 reconnect: bool,
932 connection_options: SshConnectionOptions,
933 delegate: Arc<dyn SshClientDelegate>,
934 cx: &mut AsyncAppContext,
935 ) -> Result<(SshRemoteConnection, Child)> {
936 let ssh_connection =
937 SshRemoteConnection::new(connection_options, delegate.clone(), cx).await?;
938
939 let platform = ssh_connection.query_platform().await?;
940 let (local_binary_path, version) = delegate.get_server_binary(platform, cx).await??;
941 let remote_binary_path = delegate.remote_server_binary_path(cx)?;
942 ssh_connection
943 .ensure_server_binary(
944 &delegate,
945 &local_binary_path,
946 &remote_binary_path,
947 version,
948 cx,
949 )
950 .await?;
951
952 let socket = ssh_connection.socket.clone();
953 run_cmd(socket.ssh_command(&remote_binary_path).arg("version")).await?;
954
955 delegate.set_status(Some("Starting proxy"), cx);
956
957 let mut start_proxy_command = format!(
958 "RUST_LOG={} RUST_BACKTRACE={} {:?} proxy --identifier {}",
959 std::env::var("RUST_LOG").unwrap_or_default(),
960 std::env::var("RUST_BACKTRACE").unwrap_or_default(),
961 remote_binary_path,
962 unique_identifier,
963 );
964 if reconnect {
965 start_proxy_command.push_str(" --reconnect");
966 }
967
968 let ssh_proxy_process = socket
969 .ssh_command(start_proxy_command)
970 // IMPORTANT: we kill this process when we drop the task that uses it.
971 .kill_on_drop(true)
972 .spawn()
973 .context("failed to spawn remote server")?;
974
975 Ok((ssh_connection, ssh_proxy_process))
976 }
977
978 pub fn subscribe_to_entity<E: 'static>(&self, remote_id: u64, entity: &Model<E>) {
979 self.client.subscribe_to_entity(remote_id, entity);
980 }
981
982 pub fn ssh_args(&self) -> Option<Vec<String>> {
983 self.state
984 .lock()
985 .as_ref()
986 .and_then(|state| state.ssh_connection())
987 .map(|ssh_connection| ssh_connection.socket.ssh_args())
988 }
989
990 pub fn to_proto_client(&self) -> AnyProtoClient {
991 self.client.clone().into()
992 }
993
994 pub fn connection_string(&self) -> String {
995 self.connection_options.connection_string()
996 }
997
998 pub fn connection_options(&self) -> SshConnectionOptions {
999 self.connection_options.clone()
1000 }
1001
1002 #[cfg(not(any(test, feature = "test-support")))]
1003 pub fn connection_state(&self) -> ConnectionState {
1004 self.state
1005 .lock()
1006 .as_ref()
1007 .map(ConnectionState::from)
1008 .unwrap_or(ConnectionState::Disconnected)
1009 }
1010
1011 #[cfg(any(test, feature = "test-support"))]
1012 pub fn connection_state(&self) -> ConnectionState {
1013 ConnectionState::Connected
1014 }
1015
1016 pub fn is_disconnected(&self) -> bool {
1017 self.connection_state() == ConnectionState::Disconnected
1018 }
1019
1020 #[cfg(any(test, feature = "test-support"))]
1021 pub fn fake(
1022 client_cx: &mut gpui::TestAppContext,
1023 server_cx: &mut gpui::TestAppContext,
1024 ) -> (Model<Self>, Arc<ChannelClient>) {
1025 use gpui::Context;
1026
1027 let (server_to_client_tx, server_to_client_rx) = mpsc::unbounded();
1028 let (client_to_server_tx, client_to_server_rx) = mpsc::unbounded();
1029
1030 (
1031 client_cx.update(|cx| {
1032 let client = ChannelClient::new(server_to_client_rx, client_to_server_tx, cx);
1033 cx.new_model(|_| Self {
1034 client,
1035 unique_identifier: "fake".to_string(),
1036 connection_options: SshConnectionOptions::default(),
1037 state: Arc::new(Mutex::new(None)),
1038 })
1039 }),
1040 server_cx.update(|cx| ChannelClient::new(client_to_server_rx, server_to_client_tx, cx)),
1041 )
1042 }
1043}
1044
1045impl From<SshRemoteClient> for AnyProtoClient {
1046 fn from(client: SshRemoteClient) -> Self {
1047 AnyProtoClient::new(client.client.clone())
1048 }
1049}
1050
1051struct SshRemoteConnection {
1052 socket: SshSocket,
1053 master_process: process::Child,
1054 _temp_dir: TempDir,
1055}
1056
1057impl Drop for SshRemoteConnection {
1058 fn drop(&mut self) {
1059 if let Err(error) = self.master_process.kill() {
1060 log::error!("failed to kill SSH master process: {}", error);
1061 }
1062 }
1063}
1064
1065impl SshRemoteConnection {
1066 #[cfg(not(unix))]
1067 async fn new(
1068 _connection_options: SshConnectionOptions,
1069 _delegate: Arc<dyn SshClientDelegate>,
1070 _cx: &mut AsyncAppContext,
1071 ) -> Result<Self> {
1072 Err(anyhow!("ssh is not supported on this platform"))
1073 }
1074
1075 #[cfg(unix)]
1076 async fn new(
1077 connection_options: SshConnectionOptions,
1078 delegate: Arc<dyn SshClientDelegate>,
1079 cx: &mut AsyncAppContext,
1080 ) -> Result<Self> {
1081 use futures::{io::BufReader, AsyncBufReadExt as _};
1082 use smol::{fs::unix::PermissionsExt as _, net::unix::UnixListener};
1083 use util::ResultExt as _;
1084
1085 delegate.set_status(Some("connecting"), cx);
1086
1087 let url = connection_options.ssh_url();
1088 let temp_dir = tempfile::Builder::new()
1089 .prefix("zed-ssh-session")
1090 .tempdir()?;
1091
1092 // Create a domain socket listener to handle requests from the askpass program.
1093 let askpass_socket = temp_dir.path().join("askpass.sock");
1094 let (askpass_opened_tx, askpass_opened_rx) = oneshot::channel::<()>();
1095 let listener =
1096 UnixListener::bind(&askpass_socket).context("failed to create askpass socket")?;
1097
1098 let askpass_task = cx.spawn({
1099 let delegate = delegate.clone();
1100 |mut cx| async move {
1101 let mut askpass_opened_tx = Some(askpass_opened_tx);
1102
1103 while let Ok((mut stream, _)) = listener.accept().await {
1104 if let Some(askpass_opened_tx) = askpass_opened_tx.take() {
1105 askpass_opened_tx.send(()).ok();
1106 }
1107 let mut buffer = Vec::new();
1108 let mut reader = BufReader::new(&mut stream);
1109 if reader.read_until(b'\0', &mut buffer).await.is_err() {
1110 buffer.clear();
1111 }
1112 let password_prompt = String::from_utf8_lossy(&buffer);
1113 if let Some(password) = delegate
1114 .ask_password(password_prompt.to_string(), &mut cx)
1115 .await
1116 .context("failed to get ssh password")
1117 .and_then(|p| p)
1118 .log_err()
1119 {
1120 stream.write_all(password.as_bytes()).await.log_err();
1121 }
1122 }
1123 }
1124 });
1125
1126 // Create an askpass script that communicates back to this process.
1127 let askpass_script = format!(
1128 "{shebang}\n{print_args} | nc -U {askpass_socket} 2> /dev/null \n",
1129 askpass_socket = askpass_socket.display(),
1130 print_args = "printf '%s\\0' \"$@\"",
1131 shebang = "#!/bin/sh",
1132 );
1133 let askpass_script_path = temp_dir.path().join("askpass.sh");
1134 fs::write(&askpass_script_path, askpass_script).await?;
1135 fs::set_permissions(&askpass_script_path, std::fs::Permissions::from_mode(0o755)).await?;
1136
1137 // Start the master SSH process, which does not do anything except for establish
1138 // the connection and keep it open, allowing other ssh commands to reuse it
1139 // via a control socket.
1140 let socket_path = temp_dir.path().join("ssh.sock");
1141 let mut master_process = process::Command::new("ssh")
1142 .stdin(Stdio::null())
1143 .stdout(Stdio::piped())
1144 .stderr(Stdio::piped())
1145 .env("SSH_ASKPASS_REQUIRE", "force")
1146 .env("SSH_ASKPASS", &askpass_script_path)
1147 .args(["-N", "-o", "ControlMaster=yes", "-o"])
1148 .arg(format!("ControlPath={}", socket_path.display()))
1149 .arg(&url)
1150 .spawn()?;
1151
1152 // Wait for this ssh process to close its stdout, indicating that authentication
1153 // has completed.
1154 let stdout = master_process.stdout.as_mut().unwrap();
1155 let mut output = Vec::new();
1156 let connection_timeout = Duration::from_secs(10);
1157
1158 let result = select_biased! {
1159 _ = askpass_opened_rx.fuse() => {
1160 // If the askpass script has opened, that means the user is typing
1161 // their password, in which case we don't want to timeout anymore,
1162 // since we know a connection has been established.
1163 stdout.read_to_end(&mut output).await?;
1164 Ok(())
1165 }
1166 result = stdout.read_to_end(&mut output).fuse() => {
1167 result?;
1168 Ok(())
1169 }
1170 _ = futures::FutureExt::fuse(smol::Timer::after(connection_timeout)) => {
1171 Err(anyhow!("Exceeded {:?} timeout trying to connect to host", connection_timeout))
1172 }
1173 };
1174
1175 if let Err(e) = result {
1176 let error_message = format!("Failed to connect to host: {}.", e);
1177 delegate.set_error(error_message, cx);
1178 return Err(e);
1179 }
1180
1181 drop(askpass_task);
1182
1183 if master_process.try_status()?.is_some() {
1184 output.clear();
1185 let mut stderr = master_process.stderr.take().unwrap();
1186 stderr.read_to_end(&mut output).await?;
1187
1188 let error_message = format!("failed to connect: {}", String::from_utf8_lossy(&output));
1189 delegate.set_error(error_message.clone(), cx);
1190 Err(anyhow!(error_message))?;
1191 }
1192
1193 Ok(Self {
1194 socket: SshSocket {
1195 connection_options,
1196 socket_path,
1197 },
1198 master_process,
1199 _temp_dir: temp_dir,
1200 })
1201 }
1202
1203 async fn ensure_server_binary(
1204 &self,
1205 delegate: &Arc<dyn SshClientDelegate>,
1206 src_path: &Path,
1207 dst_path: &Path,
1208 version: SemanticVersion,
1209 cx: &mut AsyncAppContext,
1210 ) -> Result<()> {
1211 let mut dst_path_gz = dst_path.to_path_buf();
1212 dst_path_gz.set_extension("gz");
1213
1214 if let Some(parent) = dst_path.parent() {
1215 run_cmd(self.socket.ssh_command("mkdir").arg("-p").arg(parent)).await?;
1216 }
1217
1218 let mut server_binary_exists = false;
1219 if cfg!(not(debug_assertions)) {
1220 if let Ok(installed_version) =
1221 run_cmd(self.socket.ssh_command(dst_path).arg("version")).await
1222 {
1223 if installed_version.trim() == version.to_string() {
1224 server_binary_exists = true;
1225 }
1226 }
1227 }
1228
1229 if server_binary_exists {
1230 log::info!("remote development server already present",);
1231 return Ok(());
1232 }
1233
1234 let src_stat = fs::metadata(src_path).await?;
1235 let size = src_stat.len();
1236 let server_mode = 0o755;
1237
1238 let t0 = Instant::now();
1239 delegate.set_status(Some("uploading remote development server"), cx);
1240 log::info!("uploading remote development server ({}kb)", size / 1024);
1241 self.upload_file(src_path, &dst_path_gz)
1242 .await
1243 .context("failed to upload server binary")?;
1244 log::info!("uploaded remote development server in {:?}", t0.elapsed());
1245
1246 delegate.set_status(Some("extracting remote development server"), cx);
1247 run_cmd(
1248 self.socket
1249 .ssh_command("gunzip")
1250 .arg("--force")
1251 .arg(&dst_path_gz),
1252 )
1253 .await?;
1254
1255 delegate.set_status(Some("unzipping remote development server"), cx);
1256 run_cmd(
1257 self.socket
1258 .ssh_command("chmod")
1259 .arg(format!("{:o}", server_mode))
1260 .arg(dst_path),
1261 )
1262 .await?;
1263
1264 Ok(())
1265 }
1266
1267 async fn query_platform(&self) -> Result<SshPlatform> {
1268 let os = run_cmd(self.socket.ssh_command("uname").arg("-s")).await?;
1269 let arch = run_cmd(self.socket.ssh_command("uname").arg("-m")).await?;
1270
1271 let os = match os.trim() {
1272 "Darwin" => "macos",
1273 "Linux" => "linux",
1274 _ => Err(anyhow!("unknown uname os {os:?}"))?,
1275 };
1276 let arch = if arch.starts_with("arm") || arch.starts_with("aarch64") {
1277 "aarch64"
1278 } else if arch.starts_with("x86") || arch.starts_with("i686") {
1279 "x86_64"
1280 } else {
1281 Err(anyhow!("unknown uname architecture {arch:?}"))?
1282 };
1283
1284 Ok(SshPlatform { os, arch })
1285 }
1286
1287 async fn upload_file(&self, src_path: &Path, dest_path: &Path) -> Result<()> {
1288 let mut command = process::Command::new("scp");
1289 let output = self
1290 .socket
1291 .ssh_options(&mut command)
1292 .args(
1293 self.socket
1294 .connection_options
1295 .port
1296 .map(|port| vec!["-P".to_string(), port.to_string()])
1297 .unwrap_or_default(),
1298 )
1299 .arg(src_path)
1300 .arg(format!(
1301 "{}:{}",
1302 self.socket.connection_options.scp_url(),
1303 dest_path.display()
1304 ))
1305 .output()
1306 .await?;
1307
1308 if output.status.success() {
1309 Ok(())
1310 } else {
1311 Err(anyhow!(
1312 "failed to upload file {} -> {}: {}",
1313 src_path.display(),
1314 dest_path.display(),
1315 String::from_utf8_lossy(&output.stderr)
1316 ))
1317 }
1318 }
1319}
1320
1321type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
1322
1323pub struct ChannelClient {
1324 next_message_id: AtomicU32,
1325 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1326 response_channels: ResponseChannels, // Lock
1327 message_handlers: Mutex<ProtoMessageHandlerSet>, // Lock
1328}
1329
1330impl ChannelClient {
1331 pub fn new(
1332 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1333 outgoing_tx: mpsc::UnboundedSender<Envelope>,
1334 cx: &AppContext,
1335 ) -> Arc<Self> {
1336 let this = Arc::new(Self {
1337 outgoing_tx,
1338 next_message_id: AtomicU32::new(0),
1339 response_channels: ResponseChannels::default(),
1340 message_handlers: Default::default(),
1341 });
1342
1343 Self::start_handling_messages(this.clone(), incoming_rx, cx);
1344
1345 this
1346 }
1347
1348 fn start_handling_messages(
1349 this: Arc<Self>,
1350 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
1351 cx: &AppContext,
1352 ) {
1353 cx.spawn(|cx| {
1354 let this = Arc::downgrade(&this);
1355 async move {
1356 let peer_id = PeerId { owner_id: 0, id: 0 };
1357 while let Some(incoming) = incoming_rx.next().await {
1358 let Some(this) = this.upgrade() else {
1359 return anyhow::Ok(());
1360 };
1361
1362 if let Some(request_id) = incoming.responding_to {
1363 let request_id = MessageId(request_id);
1364 let sender = this.response_channels.lock().remove(&request_id);
1365 if let Some(sender) = sender {
1366 let (tx, rx) = oneshot::channel();
1367 if incoming.payload.is_some() {
1368 sender.send((incoming, tx)).ok();
1369 }
1370 rx.await.ok();
1371 }
1372 } else if let Some(envelope) =
1373 build_typed_envelope(peer_id, Instant::now(), incoming)
1374 {
1375 let type_name = envelope.payload_type_name();
1376 if let Some(future) = ProtoMessageHandlerSet::handle_message(
1377 &this.message_handlers,
1378 envelope,
1379 this.clone().into(),
1380 cx.clone(),
1381 ) {
1382 log::debug!("ssh message received. name:{type_name}");
1383 match future.await {
1384 Ok(_) => {
1385 log::debug!("ssh message handled. name:{type_name}");
1386 }
1387 Err(error) => {
1388 log::error!(
1389 "error handling message. type:{type_name}, error:{error}",
1390 );
1391 }
1392 }
1393 } else {
1394 log::error!("unhandled ssh message name:{type_name}");
1395 }
1396 }
1397 }
1398 anyhow::Ok(())
1399 }
1400 })
1401 .detach();
1402 }
1403
1404 pub fn subscribe_to_entity<E: 'static>(&self, remote_id: u64, entity: &Model<E>) {
1405 let id = (TypeId::of::<E>(), remote_id);
1406
1407 let mut message_handlers = self.message_handlers.lock();
1408 if message_handlers
1409 .entities_by_type_and_remote_id
1410 .contains_key(&id)
1411 {
1412 panic!("already subscribed to entity");
1413 }
1414
1415 message_handlers.entities_by_type_and_remote_id.insert(
1416 id,
1417 EntityMessageSubscriber::Entity {
1418 handle: entity.downgrade().into(),
1419 },
1420 );
1421 }
1422
1423 pub fn request<T: RequestMessage>(
1424 &self,
1425 payload: T,
1426 ) -> impl 'static + Future<Output = Result<T::Response>> {
1427 log::debug!("ssh request start. name:{}", T::NAME);
1428 let response = self.request_dynamic(payload.into_envelope(0, None, None), T::NAME);
1429 async move {
1430 let response = response.await?;
1431 log::debug!("ssh request finish. name:{}", T::NAME);
1432 T::Response::from_envelope(response)
1433 .ok_or_else(|| anyhow!("received a response of the wrong type"))
1434 }
1435 }
1436
1437 pub async fn ping(&self, timeout: Duration) -> Result<()> {
1438 smol::future::or(
1439 async {
1440 self.request(proto::Ping {}).await?;
1441 Ok(())
1442 },
1443 async {
1444 smol::Timer::after(timeout).await;
1445 Err(anyhow!("Timeout detected"))
1446 },
1447 )
1448 .await
1449 }
1450
1451 pub fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
1452 log::debug!("ssh send name:{}", T::NAME);
1453 self.send_dynamic(payload.into_envelope(0, None, None))
1454 }
1455
1456 pub fn request_dynamic(
1457 &self,
1458 mut envelope: proto::Envelope,
1459 type_name: &'static str,
1460 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
1461 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1462 let (tx, rx) = oneshot::channel();
1463 let mut response_channels_lock = self.response_channels.lock();
1464 response_channels_lock.insert(MessageId(envelope.id), tx);
1465 drop(response_channels_lock);
1466 let result = self.outgoing_tx.unbounded_send(envelope);
1467 async move {
1468 if let Err(error) = &result {
1469 log::error!("failed to send message: {}", error);
1470 return Err(anyhow!("failed to send message: {}", error));
1471 }
1472
1473 let response = rx.await.context("connection lost")?.0;
1474 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
1475 return Err(RpcError::from_proto(error, type_name));
1476 }
1477 Ok(response)
1478 }
1479 }
1480
1481 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
1482 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
1483 self.outgoing_tx.unbounded_send(envelope)?;
1484 Ok(())
1485 }
1486}
1487
1488impl ProtoClient for ChannelClient {
1489 fn request(
1490 &self,
1491 envelope: proto::Envelope,
1492 request_type: &'static str,
1493 ) -> BoxFuture<'static, Result<proto::Envelope>> {
1494 self.request_dynamic(envelope, request_type).boxed()
1495 }
1496
1497 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
1498 self.send_dynamic(envelope)
1499 }
1500
1501 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
1502 self.send_dynamic(envelope)
1503 }
1504
1505 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
1506 &self.message_handlers
1507 }
1508
1509 fn is_via_collab(&self) -> bool {
1510 false
1511 }
1512}