1use crate::{
2 json_log::LogRecord,
3 protocol::{
4 message_len_from_buffer, read_message_with_len, write_message, MessageId, MESSAGE_LEN_SIZE,
5 },
6};
7use anyhow::{anyhow, Context as _, Result};
8use collections::HashMap;
9use futures::{
10 channel::{mpsc, oneshot},
11 future::{BoxFuture, LocalBoxFuture},
12 select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, StreamExt as _,
13};
14use gpui::{AppContext, AsyncAppContext, Model, SemanticVersion, WeakModel};
15use parking_lot::Mutex;
16use rpc::{
17 proto::{
18 self, build_typed_envelope, AnyTypedEnvelope, Envelope, EnvelopedMessage, PeerId,
19 ProtoClient, RequestMessage,
20 },
21 TypedEnvelope,
22};
23use smol::{
24 fs,
25 process::{self, Stdio},
26};
27use std::{
28 any::TypeId,
29 ffi::OsStr,
30 path::{Path, PathBuf},
31 sync::{
32 atomic::{AtomicU32, Ordering::SeqCst},
33 Arc,
34 },
35 time::Instant,
36};
37use tempfile::TempDir;
38
39#[derive(Clone)]
40pub struct SshSocket {
41 connection_options: SshConnectionOptions,
42 socket_path: PathBuf,
43}
44
45pub struct SshSession {
46 next_message_id: AtomicU32,
47 response_channels: ResponseChannels,
48 outgoing_tx: mpsc::UnboundedSender<Envelope>,
49 spawn_process_tx: mpsc::UnboundedSender<SpawnRequest>,
50 client_socket: Option<SshSocket>,
51 message_handlers: Mutex<
52 HashMap<
53 TypeId,
54 Arc<
55 dyn Send
56 + Sync
57 + Fn(
58 Box<dyn AnyTypedEnvelope>,
59 Arc<SshSession>,
60 AsyncAppContext,
61 ) -> Option<LocalBoxFuture<'static, Result<()>>>,
62 >,
63 >,
64 >,
65}
66
67struct SshClientState {
68 socket: SshSocket,
69 _master_process: process::Child,
70 _temp_dir: TempDir,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct SshConnectionOptions {
75 pub host: String,
76 pub username: Option<String>,
77 pub port: Option<u16>,
78 pub password: Option<String>,
79}
80
81impl SshConnectionOptions {
82 pub fn ssh_url(&self) -> String {
83 let mut result = String::from("ssh://");
84 if let Some(username) = &self.username {
85 result.push_str(username);
86 result.push('@');
87 }
88 result.push_str(&self.host);
89 if let Some(port) = self.port {
90 result.push(':');
91 result.push_str(&port.to_string());
92 }
93 result
94 }
95
96 fn scp_url(&self) -> String {
97 if let Some(username) = &self.username {
98 format!("{}@{}", username, self.host)
99 } else {
100 self.host.clone()
101 }
102 }
103
104 pub fn connection_string(&self) -> String {
105 let host = if let Some(username) = &self.username {
106 format!("{}@{}", username, self.host)
107 } else {
108 self.host.clone()
109 };
110 if let Some(port) = &self.port {
111 format!("{}:{}", host, port)
112 } else {
113 host
114 }
115 }
116}
117
118struct SpawnRequest {
119 command: String,
120 process_tx: oneshot::Sender<process::Child>,
121}
122
123#[derive(Copy, Clone, Debug)]
124pub struct SshPlatform {
125 pub os: &'static str,
126 pub arch: &'static str,
127}
128
129pub trait SshClientDelegate {
130 fn ask_password(
131 &self,
132 prompt: String,
133 cx: &mut AsyncAppContext,
134 ) -> oneshot::Receiver<Result<String>>;
135 fn remote_server_binary_path(&self, cx: &mut AsyncAppContext) -> Result<PathBuf>;
136 fn get_server_binary(
137 &self,
138 platform: SshPlatform,
139 cx: &mut AsyncAppContext,
140 ) -> oneshot::Receiver<Result<(PathBuf, SemanticVersion)>>;
141 fn set_status(&self, status: Option<&str>, cx: &mut AsyncAppContext);
142}
143
144type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
145
146impl SshSession {
147 pub async fn client(
148 connection_options: SshConnectionOptions,
149 delegate: Arc<dyn SshClientDelegate>,
150 cx: &mut AsyncAppContext,
151 ) -> Result<Arc<Self>> {
152 let client_state = SshClientState::new(connection_options, delegate.clone(), cx).await?;
153
154 let platform = client_state.query_platform().await?;
155 let (local_binary_path, version) = delegate.get_server_binary(platform, cx).await??;
156 let remote_binary_path = delegate.remote_server_binary_path(cx)?;
157 client_state
158 .ensure_server_binary(
159 &delegate,
160 &local_binary_path,
161 &remote_binary_path,
162 version,
163 cx,
164 )
165 .await?;
166
167 let (spawn_process_tx, mut spawn_process_rx) = mpsc::unbounded::<SpawnRequest>();
168 let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded::<Envelope>();
169 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
170
171 let socket = client_state.socket.clone();
172 run_cmd(socket.ssh_command(&remote_binary_path).arg("version")).await?;
173
174 let mut remote_server_child = socket
175 .ssh_command(&format!(
176 "RUST_LOG={} {:?} run",
177 std::env::var("RUST_LOG").unwrap_or(String::new()),
178 remote_binary_path,
179 ))
180 .spawn()
181 .context("failed to spawn remote server")?;
182 let mut child_stderr = remote_server_child.stderr.take().unwrap();
183 let mut child_stdout = remote_server_child.stdout.take().unwrap();
184 let mut child_stdin = remote_server_child.stdin.take().unwrap();
185
186 let executor = cx.background_executor().clone();
187 executor.clone().spawn(async move {
188 let mut stdin_buffer = Vec::new();
189 let mut stdout_buffer = Vec::new();
190 let mut stderr_buffer = Vec::new();
191 let mut stderr_offset = 0;
192
193 loop {
194 stdout_buffer.resize(MESSAGE_LEN_SIZE, 0);
195 stderr_buffer.resize(stderr_offset + 1024, 0);
196
197 select_biased! {
198 outgoing = outgoing_rx.next().fuse() => {
199 let Some(outgoing) = outgoing else {
200 return anyhow::Ok(());
201 };
202
203 write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
204 }
205
206 request = spawn_process_rx.next().fuse() => {
207 let Some(request) = request else {
208 return Ok(());
209 };
210
211 log::info!("spawn process: {:?}", request.command);
212 let child = client_state.socket
213 .ssh_command(&request.command)
214 .spawn()
215 .context("failed to create channel")?;
216 request.process_tx.send(child).ok();
217 }
218
219 result = child_stdout.read(&mut stdout_buffer).fuse() => {
220 match result {
221 Ok(len) => {
222 if len == 0 {
223 child_stdin.close().await?;
224 let status = remote_server_child.status().await?;
225 if !status.success() {
226 log::info!("channel exited with status: {status:?}");
227 }
228 return Ok(());
229 }
230
231 if len < stdout_buffer.len() {
232 child_stdout.read_exact(&mut stdout_buffer[len..]).await?;
233 }
234
235 let message_len = message_len_from_buffer(&stdout_buffer);
236 match read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len).await {
237 Ok(envelope) => {
238 incoming_tx.unbounded_send(envelope).ok();
239 }
240 Err(error) => {
241 log::error!("error decoding message {error:?}");
242 }
243 }
244 }
245 Err(error) => {
246 Err(anyhow!("error reading stdout: {error:?}"))?;
247 }
248 }
249 }
250
251 result = child_stderr.read(&mut stderr_buffer[stderr_offset..]).fuse() => {
252 match result {
253 Ok(len) => {
254 stderr_offset += len;
255 let mut start_ix = 0;
256 while let Some(ix) = stderr_buffer[start_ix..stderr_offset].iter().position(|b| b == &b'\n') {
257 let line_ix = start_ix + ix;
258 let content = &stderr_buffer[start_ix..line_ix];
259 start_ix = line_ix + 1;
260 if let Ok(record) = serde_json::from_slice::<LogRecord>(&content) {
261 record.log(log::logger())
262 } else {
263 eprintln!("(remote) {}", String::from_utf8_lossy(content));
264 }
265 }
266 stderr_buffer.drain(0..start_ix);
267 stderr_offset -= start_ix;
268 }
269 Err(error) => {
270 Err(anyhow!("error reading stderr: {error:?}"))?;
271 }
272 }
273 }
274 }
275 }
276 }).detach();
277
278 cx.update(|cx| Self::new(incoming_rx, outgoing_tx, spawn_process_tx, Some(socket), cx))
279 }
280
281 pub fn server(
282 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
283 outgoing_tx: mpsc::UnboundedSender<Envelope>,
284 cx: &AppContext,
285 ) -> Arc<SshSession> {
286 let (tx, _rx) = mpsc::unbounded();
287 Self::new(incoming_rx, outgoing_tx, tx, None, cx)
288 }
289
290 #[cfg(any(test, feature = "test-support"))]
291 pub fn fake(
292 client_cx: &mut gpui::TestAppContext,
293 server_cx: &mut gpui::TestAppContext,
294 ) -> (Arc<Self>, Arc<Self>) {
295 let (server_to_client_tx, server_to_client_rx) = mpsc::unbounded();
296 let (client_to_server_tx, client_to_server_rx) = mpsc::unbounded();
297 let (tx, _rx) = mpsc::unbounded();
298 (
299 client_cx.update(|cx| {
300 Self::new(
301 server_to_client_rx,
302 client_to_server_tx,
303 tx.clone(),
304 None, // todo()
305 cx,
306 )
307 }),
308 server_cx.update(|cx| {
309 Self::new(
310 client_to_server_rx,
311 server_to_client_tx,
312 tx.clone(),
313 None,
314 cx,
315 )
316 }),
317 )
318 }
319
320 fn new(
321 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
322 outgoing_tx: mpsc::UnboundedSender<Envelope>,
323 spawn_process_tx: mpsc::UnboundedSender<SpawnRequest>,
324 client_socket: Option<SshSocket>,
325 cx: &AppContext,
326 ) -> Arc<SshSession> {
327 let this = Arc::new(Self {
328 next_message_id: AtomicU32::new(0),
329 response_channels: ResponseChannels::default(),
330 outgoing_tx,
331 spawn_process_tx,
332 client_socket,
333 message_handlers: Default::default(),
334 });
335
336 cx.spawn(|cx| {
337 let this = this.clone();
338 async move {
339 let peer_id = PeerId { owner_id: 0, id: 0 };
340 while let Some(incoming) = incoming_rx.next().await {
341 if let Some(request_id) = incoming.responding_to {
342 let request_id = MessageId(request_id);
343 let sender = this.response_channels.lock().remove(&request_id);
344 if let Some(sender) = sender {
345 let (tx, rx) = oneshot::channel();
346 if incoming.payload.is_some() {
347 sender.send((incoming, tx)).ok();
348 }
349 rx.await.ok();
350 }
351 } else if let Some(envelope) =
352 build_typed_envelope(peer_id, Instant::now(), incoming)
353 {
354 log::debug!(
355 "ssh message received. name:{}",
356 envelope.payload_type_name()
357 );
358 let type_id = envelope.payload_type_id();
359 let handler = this.message_handlers.lock().get(&type_id).cloned();
360 if let Some(handler) = handler {
361 if let Some(future) = handler(envelope, this.clone(), cx.clone()) {
362 future.await.ok();
363 } else {
364 this.message_handlers.lock().remove(&type_id);
365 }
366 }
367 }
368 }
369 anyhow::Ok(())
370 }
371 })
372 .detach();
373
374 this
375 }
376
377 pub fn request<T: RequestMessage>(
378 &self,
379 payload: T,
380 ) -> impl 'static + Future<Output = Result<T::Response>> {
381 log::debug!("ssh request start. name:{}", T::NAME);
382 let response = self.request_dynamic(payload.into_envelope(0, None, None), "");
383 async move {
384 let response = response.await?;
385 log::debug!("ssh request finish. name:{}", T::NAME);
386 T::Response::from_envelope(response)
387 .ok_or_else(|| anyhow!("received a response of the wrong type"))
388 }
389 }
390
391 pub fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
392 self.send_dynamic(payload.into_envelope(0, None, None))
393 }
394
395 pub fn request_dynamic(
396 &self,
397 mut envelope: proto::Envelope,
398 _request_type: &'static str,
399 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
400 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
401 let (tx, rx) = oneshot::channel();
402 self.response_channels
403 .lock()
404 .insert(MessageId(envelope.id), tx);
405 self.outgoing_tx.unbounded_send(envelope).ok();
406 async move { Ok(rx.await.context("connection lost")?.0) }
407 }
408
409 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
410 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
411 self.outgoing_tx.unbounded_send(envelope)?;
412 Ok(())
413 }
414
415 pub async fn spawn_process(&self, command: String) -> process::Child {
416 let (process_tx, process_rx) = oneshot::channel();
417 self.spawn_process_tx
418 .unbounded_send(SpawnRequest {
419 command,
420 process_tx,
421 })
422 .ok();
423 process_rx.await.unwrap()
424 }
425
426 pub fn ssh_args(&self) -> Vec<String> {
427 self.client_socket.as_ref().unwrap().ssh_args()
428 }
429
430 pub fn add_message_handler<M, E, H, F>(&self, entity: WeakModel<E>, handler: H)
431 where
432 M: EnvelopedMessage,
433 E: 'static,
434 H: 'static + Sync + Send + Fn(Model<E>, TypedEnvelope<M>, AsyncAppContext) -> F,
435 F: 'static + Future<Output = Result<()>>,
436 {
437 let message_type_id = TypeId::of::<M>();
438 self.message_handlers.lock().insert(
439 message_type_id,
440 Arc::new(move |envelope, _, cx| {
441 let entity = entity.upgrade()?;
442 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
443 Some(handler(entity, *envelope, cx).boxed_local())
444 }),
445 );
446 }
447
448 pub fn add_request_handler<M, E, H, F>(&self, entity: WeakModel<E>, handler: H)
449 where
450 M: EnvelopedMessage + RequestMessage,
451 E: 'static,
452 H: 'static + Sync + Send + Fn(Model<E>, TypedEnvelope<M>, AsyncAppContext) -> F,
453 F: 'static + Future<Output = Result<M::Response>>,
454 {
455 let message_type_id = TypeId::of::<M>();
456 self.message_handlers.lock().insert(
457 message_type_id,
458 Arc::new(move |envelope, this, cx| {
459 let entity = entity.upgrade()?;
460 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
461 let request_id = envelope.message_id();
462 Some(
463 handler(entity, *envelope, cx)
464 .then(move |result| async move {
465 this.outgoing_tx.unbounded_send(result?.into_envelope(
466 this.next_message_id.fetch_add(1, SeqCst),
467 Some(request_id),
468 None,
469 ))?;
470 Ok(())
471 })
472 .boxed_local(),
473 )
474 }),
475 );
476 }
477}
478
479impl ProtoClient for SshSession {
480 fn request(
481 &self,
482 envelope: proto::Envelope,
483 request_type: &'static str,
484 ) -> BoxFuture<'static, Result<proto::Envelope>> {
485 self.request_dynamic(envelope, request_type).boxed()
486 }
487
488 fn send(&self, envelope: proto::Envelope) -> Result<()> {
489 self.send_dynamic(envelope)
490 }
491}
492
493impl SshClientState {
494 #[cfg(not(unix))]
495 async fn new(
496 _connection_options: SshConnectionOptions,
497 _delegate: Arc<dyn SshClientDelegate>,
498 _cx: &mut AsyncAppContext,
499 ) -> Result<Self> {
500 Err(anyhow!("ssh is not supported on this platform"))
501 }
502
503 #[cfg(unix)]
504 async fn new(
505 connection_options: SshConnectionOptions,
506 delegate: Arc<dyn SshClientDelegate>,
507 cx: &mut AsyncAppContext,
508 ) -> Result<Self> {
509 use futures::{io::BufReader, AsyncBufReadExt as _};
510 use smol::{fs::unix::PermissionsExt as _, net::unix::UnixListener};
511 use util::ResultExt as _;
512
513 delegate.set_status(Some("connecting"), cx);
514
515 let url = connection_options.ssh_url();
516 let temp_dir = tempfile::Builder::new()
517 .prefix("zed-ssh-session")
518 .tempdir()?;
519
520 // Create a domain socket listener to handle requests from the askpass program.
521 let askpass_socket = temp_dir.path().join("askpass.sock");
522 let listener =
523 UnixListener::bind(&askpass_socket).context("failed to create askpass socket")?;
524
525 let askpass_task = cx.spawn(|mut cx| async move {
526 while let Ok((mut stream, _)) = listener.accept().await {
527 let mut buffer = Vec::new();
528 let mut reader = BufReader::new(&mut stream);
529 if reader.read_until(b'\0', &mut buffer).await.is_err() {
530 buffer.clear();
531 }
532 let password_prompt = String::from_utf8_lossy(&buffer);
533 if let Some(password) = delegate
534 .ask_password(password_prompt.to_string(), &mut cx)
535 .await
536 .context("failed to get ssh password")
537 .and_then(|p| p)
538 .log_err()
539 {
540 stream.write_all(password.as_bytes()).await.log_err();
541 }
542 }
543 });
544
545 // Create an askpass script that communicates back to this process.
546 let askpass_script = format!(
547 "{shebang}\n{print_args} | nc -U {askpass_socket} 2> /dev/null \n",
548 askpass_socket = askpass_socket.display(),
549 print_args = "printf '%s\\0' \"$@\"",
550 shebang = "#!/bin/sh",
551 );
552 let askpass_script_path = temp_dir.path().join("askpass.sh");
553 fs::write(&askpass_script_path, askpass_script).await?;
554 fs::set_permissions(&askpass_script_path, std::fs::Permissions::from_mode(0o755)).await?;
555
556 // Start the master SSH process, which does not do anything except for establish
557 // the connection and keep it open, allowing other ssh commands to reuse it
558 // via a control socket.
559 let socket_path = temp_dir.path().join("ssh.sock");
560 let mut master_process = process::Command::new("ssh")
561 .stdin(Stdio::null())
562 .stdout(Stdio::piped())
563 .stderr(Stdio::piped())
564 .env("SSH_ASKPASS_REQUIRE", "force")
565 .env("SSH_ASKPASS", &askpass_script_path)
566 .args(["-N", "-o", "ControlMaster=yes", "-o"])
567 .arg(format!("ControlPath={}", socket_path.display()))
568 .arg(&url)
569 .spawn()?;
570
571 // Wait for this ssh process to close its stdout, indicating that authentication
572 // has completed.
573 let stdout = master_process.stdout.as_mut().unwrap();
574 let mut output = Vec::new();
575 stdout.read_to_end(&mut output).await?;
576 drop(askpass_task);
577
578 if master_process.try_status()?.is_some() {
579 output.clear();
580 let mut stderr = master_process.stderr.take().unwrap();
581 stderr.read_to_end(&mut output).await?;
582 Err(anyhow!(
583 "failed to connect: {}",
584 String::from_utf8_lossy(&output)
585 ))?;
586 }
587
588 Ok(Self {
589 socket: SshSocket {
590 connection_options,
591 socket_path,
592 },
593 _master_process: master_process,
594 _temp_dir: temp_dir,
595 })
596 }
597
598 async fn ensure_server_binary(
599 &self,
600 delegate: &Arc<dyn SshClientDelegate>,
601 src_path: &Path,
602 dst_path: &Path,
603 version: SemanticVersion,
604 cx: &mut AsyncAppContext,
605 ) -> Result<()> {
606 let mut dst_path_gz = dst_path.to_path_buf();
607 dst_path_gz.set_extension("gz");
608
609 if let Some(parent) = dst_path.parent() {
610 run_cmd(self.socket.ssh_command("mkdir").arg("-p").arg(parent)).await?;
611 }
612
613 let mut server_binary_exists = false;
614 if cfg!(not(debug_assertions)) {
615 if let Ok(installed_version) =
616 run_cmd(self.socket.ssh_command(&dst_path).arg("version")).await
617 {
618 if installed_version.trim() == version.to_string() {
619 server_binary_exists = true;
620 }
621 }
622 }
623
624 if server_binary_exists {
625 log::info!("remote development server already present",);
626 return Ok(());
627 }
628
629 let src_stat = fs::metadata(src_path).await?;
630 let size = src_stat.len();
631 let server_mode = 0o755;
632
633 let t0 = Instant::now();
634 delegate.set_status(Some("uploading remote development server"), cx);
635 log::info!("uploading remote development server ({}kb)", size / 1024);
636 self.upload_file(src_path, &dst_path_gz)
637 .await
638 .context("failed to upload server binary")?;
639 log::info!("uploaded remote development server in {:?}", t0.elapsed());
640
641 delegate.set_status(Some("extracting remote development server"), cx);
642 run_cmd(
643 self.socket
644 .ssh_command("gunzip")
645 .arg("--force")
646 .arg(&dst_path_gz),
647 )
648 .await?;
649
650 delegate.set_status(Some("unzipping remote development server"), cx);
651 run_cmd(
652 self.socket
653 .ssh_command("chmod")
654 .arg(format!("{:o}", server_mode))
655 .arg(&dst_path),
656 )
657 .await?;
658
659 Ok(())
660 }
661
662 async fn query_platform(&self) -> Result<SshPlatform> {
663 let os = run_cmd(self.socket.ssh_command("uname").arg("-s")).await?;
664 let arch = run_cmd(self.socket.ssh_command("uname").arg("-m")).await?;
665
666 let os = match os.trim() {
667 "Darwin" => "macos",
668 "Linux" => "linux",
669 _ => Err(anyhow!("unknown uname os {os:?}"))?,
670 };
671 let arch = if arch.starts_with("arm") || arch.starts_with("aarch64") {
672 "aarch64"
673 } else if arch.starts_with("x86") || arch.starts_with("i686") {
674 "x86_64"
675 } else {
676 Err(anyhow!("unknown uname architecture {arch:?}"))?
677 };
678
679 Ok(SshPlatform { os, arch })
680 }
681
682 async fn upload_file(&self, src_path: &Path, dest_path: &Path) -> Result<()> {
683 let mut command = process::Command::new("scp");
684 let output = self
685 .socket
686 .ssh_options(&mut command)
687 .args(
688 self.socket
689 .connection_options
690 .port
691 .map(|port| vec!["-P".to_string(), port.to_string()])
692 .unwrap_or_default(),
693 )
694 .arg(&src_path)
695 .arg(&format!(
696 "{}:{}",
697 self.socket.connection_options.scp_url(),
698 dest_path.display()
699 ))
700 .output()
701 .await?;
702
703 if output.status.success() {
704 Ok(())
705 } else {
706 Err(anyhow!(
707 "failed to upload file {} -> {}: {}",
708 src_path.display(),
709 dest_path.display(),
710 String::from_utf8_lossy(&output.stderr)
711 ))
712 }
713 }
714}
715
716impl SshSocket {
717 fn ssh_command<S: AsRef<OsStr>>(&self, program: S) -> process::Command {
718 let mut command = process::Command::new("ssh");
719 self.ssh_options(&mut command)
720 .arg(self.connection_options.ssh_url())
721 .arg(program);
722 command
723 }
724
725 fn ssh_options<'a>(&self, command: &'a mut process::Command) -> &'a mut process::Command {
726 command
727 .stdin(Stdio::piped())
728 .stdout(Stdio::piped())
729 .stderr(Stdio::piped())
730 .args(["-o", "ControlMaster=no", "-o"])
731 .arg(format!("ControlPath={}", self.socket_path.display()))
732 }
733
734 fn ssh_args(&self) -> Vec<String> {
735 vec![
736 "-o".to_string(),
737 "ControlMaster=no".to_string(),
738 "-o".to_string(),
739 format!("ControlPath={}", self.socket_path.display()),
740 self.connection_options.ssh_url(),
741 ]
742 }
743}
744
745async fn run_cmd(command: &mut process::Command) -> Result<String> {
746 let output = command.output().await?;
747 if output.status.success() {
748 Ok(String::from_utf8_lossy(&output.stdout).to_string())
749 } else {
750 Err(anyhow!(
751 "failed to run command: {}",
752 String::from_utf8_lossy(&output.stderr)
753 ))
754 }
755}