1use crate::{
2 json_log::LogRecord,
3 protocol::{
4 message_len_from_buffer, read_message_with_len, write_message, MessageId, MESSAGE_LEN_SIZE,
5 },
6};
7use anyhow::{anyhow, Context as _, Result};
8use collections::HashMap;
9use futures::{
10 channel::{mpsc, oneshot},
11 future::BoxFuture,
12 select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, StreamExt as _,
13};
14use gpui::{AppContext, AsyncAppContext, Model, SemanticVersion, Task};
15use parking_lot::Mutex;
16use rpc::{
17 proto::{self, build_typed_envelope, Envelope, EnvelopedMessage, PeerId, RequestMessage},
18 EntityMessageSubscriber, ProtoClient, ProtoMessageHandlerSet, RpcError,
19};
20use smol::{
21 fs,
22 process::{self, Stdio},
23};
24use std::{
25 any::TypeId,
26 ffi::OsStr,
27 path::{Path, PathBuf},
28 sync::{
29 atomic::{AtomicU32, Ordering::SeqCst},
30 Arc,
31 },
32 time::Instant,
33};
34use tempfile::TempDir;
35
36#[derive(
37 Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, serde::Serialize, serde::Deserialize,
38)]
39pub struct SshProjectId(pub u64);
40
41#[derive(Clone)]
42pub struct SshSocket {
43 connection_options: SshConnectionOptions,
44 socket_path: PathBuf,
45}
46
47pub struct SshSession {
48 next_message_id: AtomicU32,
49 response_channels: ResponseChannels, // Lock
50 outgoing_tx: mpsc::UnboundedSender<Envelope>,
51 spawn_process_tx: mpsc::UnboundedSender<SpawnRequest>,
52 client_socket: Option<SshSocket>,
53 state: Mutex<ProtoMessageHandlerSet>, // Lock
54 _io_task: Option<Task<Result<()>>>,
55}
56
57struct SshClientState {
58 socket: SshSocket,
59 master_process: process::Child,
60 _temp_dir: TempDir,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct SshConnectionOptions {
65 pub host: String,
66 pub username: Option<String>,
67 pub port: Option<u16>,
68 pub password: Option<String>,
69}
70
71impl SshConnectionOptions {
72 pub fn ssh_url(&self) -> String {
73 let mut result = String::from("ssh://");
74 if let Some(username) = &self.username {
75 result.push_str(username);
76 result.push('@');
77 }
78 result.push_str(&self.host);
79 if let Some(port) = self.port {
80 result.push(':');
81 result.push_str(&port.to_string());
82 }
83 result
84 }
85
86 fn scp_url(&self) -> String {
87 if let Some(username) = &self.username {
88 format!("{}@{}", username, self.host)
89 } else {
90 self.host.clone()
91 }
92 }
93
94 pub fn connection_string(&self) -> String {
95 let host = if let Some(username) = &self.username {
96 format!("{}@{}", username, self.host)
97 } else {
98 self.host.clone()
99 };
100 if let Some(port) = &self.port {
101 format!("{}:{}", host, port)
102 } else {
103 host
104 }
105 }
106}
107
108struct SpawnRequest {
109 command: String,
110 process_tx: oneshot::Sender<process::Child>,
111}
112
113#[derive(Copy, Clone, Debug)]
114pub struct SshPlatform {
115 pub os: &'static str,
116 pub arch: &'static str,
117}
118
119pub trait SshClientDelegate {
120 fn ask_password(
121 &self,
122 prompt: String,
123 cx: &mut AsyncAppContext,
124 ) -> oneshot::Receiver<Result<String>>;
125 fn remote_server_binary_path(&self, cx: &mut AsyncAppContext) -> Result<PathBuf>;
126 fn get_server_binary(
127 &self,
128 platform: SshPlatform,
129 cx: &mut AsyncAppContext,
130 ) -> oneshot::Receiver<Result<(PathBuf, SemanticVersion)>>;
131 fn set_status(&self, status: Option<&str>, cx: &mut AsyncAppContext);
132}
133
134type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
135
136impl SshSession {
137 pub async fn client(
138 connection_options: SshConnectionOptions,
139 delegate: Arc<dyn SshClientDelegate>,
140 cx: &mut AsyncAppContext,
141 ) -> Result<Arc<Self>> {
142 let client_state = SshClientState::new(connection_options, delegate.clone(), cx).await?;
143
144 let platform = client_state.query_platform().await?;
145 let (local_binary_path, version) = delegate.get_server_binary(platform, cx).await??;
146 let remote_binary_path = delegate.remote_server_binary_path(cx)?;
147 client_state
148 .ensure_server_binary(
149 &delegate,
150 &local_binary_path,
151 &remote_binary_path,
152 version,
153 cx,
154 )
155 .await?;
156
157 let (spawn_process_tx, mut spawn_process_rx) = mpsc::unbounded::<SpawnRequest>();
158 let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded::<Envelope>();
159 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
160
161 let socket = client_state.socket.clone();
162 run_cmd(socket.ssh_command(&remote_binary_path).arg("version")).await?;
163
164 let mut remote_server_child = socket
165 .ssh_command(format!(
166 "RUST_LOG={} RUST_BACKTRACE={} {:?} run",
167 std::env::var("RUST_LOG").unwrap_or_default(),
168 std::env::var("RUST_BACKTRACE").unwrap_or_default(),
169 remote_binary_path,
170 ))
171 .spawn()
172 .context("failed to spawn remote server")?;
173 let mut child_stderr = remote_server_child.stderr.take().unwrap();
174 let mut child_stdout = remote_server_child.stdout.take().unwrap();
175 let mut child_stdin = remote_server_child.stdin.take().unwrap();
176
177 let io_task = cx.background_executor().spawn(async move {
178 let mut stdin_buffer = Vec::new();
179 let mut stdout_buffer = Vec::new();
180 let mut stderr_buffer = Vec::new();
181 let mut stderr_offset = 0;
182
183 loop {
184 stdout_buffer.resize(MESSAGE_LEN_SIZE, 0);
185 stderr_buffer.resize(stderr_offset + 1024, 0);
186
187 select_biased! {
188 outgoing = outgoing_rx.next().fuse() => {
189 let Some(outgoing) = outgoing else {
190 return anyhow::Ok(());
191 };
192
193 write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
194 }
195
196 request = spawn_process_rx.next().fuse() => {
197 let Some(request) = request else {
198 return Ok(());
199 };
200
201 log::info!("spawn process: {:?}", request.command);
202 let child = client_state.socket
203 .ssh_command(&request.command)
204 .spawn()
205 .context("failed to create channel")?;
206 request.process_tx.send(child).ok();
207 }
208
209 result = child_stdout.read(&mut stdout_buffer).fuse() => {
210 match result {
211 Ok(len) => {
212 if len == 0 {
213 child_stdin.close().await?;
214 let status = remote_server_child.status().await?;
215 if !status.success() {
216 log::info!("channel exited with status: {status:?}");
217 }
218 return Ok(());
219 }
220
221 if len < stdout_buffer.len() {
222 child_stdout.read_exact(&mut stdout_buffer[len..]).await?;
223 }
224
225 let message_len = message_len_from_buffer(&stdout_buffer);
226 match read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len).await {
227 Ok(envelope) => {
228 incoming_tx.unbounded_send(envelope).ok();
229 }
230 Err(error) => {
231 log::error!("error decoding message {error:?}");
232 }
233 }
234 }
235 Err(error) => {
236 Err(anyhow!("error reading stdout: {error:?}"))?;
237 }
238 }
239 }
240
241 result = child_stderr.read(&mut stderr_buffer[stderr_offset..]).fuse() => {
242 match result {
243 Ok(len) => {
244 stderr_offset += len;
245 let mut start_ix = 0;
246 while let Some(ix) = stderr_buffer[start_ix..stderr_offset].iter().position(|b| b == &b'\n') {
247 let line_ix = start_ix + ix;
248 let content = &stderr_buffer[start_ix..line_ix];
249 start_ix = line_ix + 1;
250 if let Ok(mut record) = serde_json::from_slice::<LogRecord>(content) {
251 record.message = format!("(remote) {}", record.message);
252 record.log(log::logger())
253 } else {
254 eprintln!("(remote) {}", String::from_utf8_lossy(content));
255 }
256 }
257 stderr_buffer.drain(0..start_ix);
258 stderr_offset -= start_ix;
259 }
260 Err(error) => {
261 Err(anyhow!("error reading stderr: {error:?}"))?;
262 }
263 }
264 }
265 }
266 }
267 });
268
269 cx.update(|cx| {
270 Self::new(
271 incoming_rx,
272 outgoing_tx,
273 spawn_process_tx,
274 Some(socket),
275 Some(io_task),
276 cx,
277 )
278 })
279 }
280
281 pub fn server(
282 incoming_rx: mpsc::UnboundedReceiver<Envelope>,
283 outgoing_tx: mpsc::UnboundedSender<Envelope>,
284 cx: &AppContext,
285 ) -> Arc<SshSession> {
286 let (tx, _rx) = mpsc::unbounded();
287 Self::new(incoming_rx, outgoing_tx, tx, None, None, cx)
288 }
289
290 #[cfg(any(test, feature = "test-support"))]
291 pub fn fake(
292 client_cx: &mut gpui::TestAppContext,
293 server_cx: &mut gpui::TestAppContext,
294 ) -> (Arc<Self>, Arc<Self>) {
295 let (server_to_client_tx, server_to_client_rx) = mpsc::unbounded();
296 let (client_to_server_tx, client_to_server_rx) = mpsc::unbounded();
297 let (tx, _rx) = mpsc::unbounded();
298 (
299 client_cx.update(|cx| {
300 Self::new(
301 server_to_client_rx,
302 client_to_server_tx,
303 tx.clone(),
304 None, // todo()
305 None,
306 cx,
307 )
308 }),
309 server_cx.update(|cx| {
310 Self::new(
311 client_to_server_rx,
312 server_to_client_tx,
313 tx.clone(),
314 None,
315 None,
316 cx,
317 )
318 }),
319 )
320 }
321
322 fn new(
323 mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
324 outgoing_tx: mpsc::UnboundedSender<Envelope>,
325 spawn_process_tx: mpsc::UnboundedSender<SpawnRequest>,
326 client_socket: Option<SshSocket>,
327 io_task: Option<Task<Result<()>>>,
328 cx: &AppContext,
329 ) -> Arc<SshSession> {
330 let this = Arc::new(Self {
331 next_message_id: AtomicU32::new(0),
332 response_channels: ResponseChannels::default(),
333 outgoing_tx,
334 spawn_process_tx,
335 client_socket,
336 state: Default::default(),
337 _io_task: io_task,
338 });
339
340 cx.spawn(|cx| {
341 let this = Arc::downgrade(&this);
342 async move {
343 let peer_id = PeerId { owner_id: 0, id: 0 };
344 while let Some(incoming) = incoming_rx.next().await {
345 let Some(this) = this.upgrade() else {
346 return anyhow::Ok(());
347 };
348
349 if let Some(request_id) = incoming.responding_to {
350 let request_id = MessageId(request_id);
351 let sender = this.response_channels.lock().remove(&request_id);
352 if let Some(sender) = sender {
353 let (tx, rx) = oneshot::channel();
354 if incoming.payload.is_some() {
355 sender.send((incoming, tx)).ok();
356 }
357 rx.await.ok();
358 }
359 } else if let Some(envelope) =
360 build_typed_envelope(peer_id, Instant::now(), incoming)
361 {
362 let type_name = envelope.payload_type_name();
363 if let Some(future) = ProtoMessageHandlerSet::handle_message(
364 &this.state,
365 envelope,
366 this.clone().into(),
367 cx.clone(),
368 ) {
369 log::debug!("ssh message received. name:{type_name}");
370 match future.await {
371 Ok(_) => {
372 log::debug!("ssh message handled. name:{type_name}");
373 }
374 Err(error) => {
375 log::error!(
376 "error handling message. type:{type_name}, error:{error}",
377 );
378 }
379 }
380 } else {
381 log::error!("unhandled ssh message name:{type_name}");
382 }
383 }
384 }
385 anyhow::Ok(())
386 }
387 })
388 .detach();
389
390 this
391 }
392
393 pub fn request<T: RequestMessage>(
394 &self,
395 payload: T,
396 ) -> impl 'static + Future<Output = Result<T::Response>> {
397 log::debug!("ssh request start. name:{}", T::NAME);
398 let response = self.request_dynamic(payload.into_envelope(0, None, None), T::NAME);
399 async move {
400 let response = response.await?;
401 log::debug!("ssh request finish. name:{}", T::NAME);
402 T::Response::from_envelope(response)
403 .ok_or_else(|| anyhow!("received a response of the wrong type"))
404 }
405 }
406
407 pub fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
408 log::debug!("ssh send name:{}", T::NAME);
409 self.send_dynamic(payload.into_envelope(0, None, None))
410 }
411
412 pub fn request_dynamic(
413 &self,
414 mut envelope: proto::Envelope,
415 type_name: &'static str,
416 ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
417 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
418 let (tx, rx) = oneshot::channel();
419 let mut response_channels_lock = self.response_channels.lock();
420 response_channels_lock.insert(MessageId(envelope.id), tx);
421 drop(response_channels_lock);
422 self.outgoing_tx.unbounded_send(envelope).ok();
423 async move {
424 let response = rx.await.context("connection lost")?.0;
425 if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
426 return Err(RpcError::from_proto(error, type_name));
427 }
428 Ok(response)
429 }
430 }
431
432 pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
433 envelope.id = self.next_message_id.fetch_add(1, SeqCst);
434 self.outgoing_tx.unbounded_send(envelope)?;
435 Ok(())
436 }
437
438 pub fn subscribe_to_entity<E: 'static>(&self, remote_id: u64, entity: &Model<E>) {
439 let id = (TypeId::of::<E>(), remote_id);
440
441 let mut state = self.state.lock();
442 if state.entities_by_type_and_remote_id.contains_key(&id) {
443 panic!("already subscribed to entity");
444 }
445
446 state.entities_by_type_and_remote_id.insert(
447 id,
448 EntityMessageSubscriber::Entity {
449 handle: entity.downgrade().into(),
450 },
451 );
452 }
453
454 pub async fn spawn_process(&self, command: String) -> process::Child {
455 let (process_tx, process_rx) = oneshot::channel();
456 self.spawn_process_tx
457 .unbounded_send(SpawnRequest {
458 command,
459 process_tx,
460 })
461 .ok();
462 process_rx.await.unwrap()
463 }
464
465 pub fn ssh_args(&self) -> Vec<String> {
466 self.client_socket.as_ref().unwrap().ssh_args()
467 }
468}
469
470impl ProtoClient for SshSession {
471 fn request(
472 &self,
473 envelope: proto::Envelope,
474 request_type: &'static str,
475 ) -> BoxFuture<'static, Result<proto::Envelope>> {
476 self.request_dynamic(envelope, request_type).boxed()
477 }
478
479 fn send(&self, envelope: proto::Envelope, _message_type: &'static str) -> Result<()> {
480 self.send_dynamic(envelope)
481 }
482
483 fn send_response(&self, envelope: Envelope, _message_type: &'static str) -> anyhow::Result<()> {
484 self.send_dynamic(envelope)
485 }
486
487 fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
488 &self.state
489 }
490
491 fn is_via_collab(&self) -> bool {
492 false
493 }
494}
495
496impl SshClientState {
497 #[cfg(not(unix))]
498 async fn new(
499 _connection_options: SshConnectionOptions,
500 _delegate: Arc<dyn SshClientDelegate>,
501 _cx: &mut AsyncAppContext,
502 ) -> Result<Self> {
503 Err(anyhow!("ssh is not supported on this platform"))
504 }
505
506 #[cfg(unix)]
507 async fn new(
508 connection_options: SshConnectionOptions,
509 delegate: Arc<dyn SshClientDelegate>,
510 cx: &mut AsyncAppContext,
511 ) -> Result<Self> {
512 use futures::{io::BufReader, AsyncBufReadExt as _};
513 use smol::{fs::unix::PermissionsExt as _, net::unix::UnixListener};
514 use util::ResultExt as _;
515
516 delegate.set_status(Some("connecting"), cx);
517
518 let url = connection_options.ssh_url();
519 let temp_dir = tempfile::Builder::new()
520 .prefix("zed-ssh-session")
521 .tempdir()?;
522
523 // Create a domain socket listener to handle requests from the askpass program.
524 let askpass_socket = temp_dir.path().join("askpass.sock");
525 let listener =
526 UnixListener::bind(&askpass_socket).context("failed to create askpass socket")?;
527
528 let askpass_task = cx.spawn(|mut cx| async move {
529 while let Ok((mut stream, _)) = listener.accept().await {
530 let mut buffer = Vec::new();
531 let mut reader = BufReader::new(&mut stream);
532 if reader.read_until(b'\0', &mut buffer).await.is_err() {
533 buffer.clear();
534 }
535 let password_prompt = String::from_utf8_lossy(&buffer);
536 if let Some(password) = delegate
537 .ask_password(password_prompt.to_string(), &mut cx)
538 .await
539 .context("failed to get ssh password")
540 .and_then(|p| p)
541 .log_err()
542 {
543 stream.write_all(password.as_bytes()).await.log_err();
544 }
545 }
546 });
547
548 // Create an askpass script that communicates back to this process.
549 let askpass_script = format!(
550 "{shebang}\n{print_args} | nc -U {askpass_socket} 2> /dev/null \n",
551 askpass_socket = askpass_socket.display(),
552 print_args = "printf '%s\\0' \"$@\"",
553 shebang = "#!/bin/sh",
554 );
555 let askpass_script_path = temp_dir.path().join("askpass.sh");
556 fs::write(&askpass_script_path, askpass_script).await?;
557 fs::set_permissions(&askpass_script_path, std::fs::Permissions::from_mode(0o755)).await?;
558
559 // Start the master SSH process, which does not do anything except for establish
560 // the connection and keep it open, allowing other ssh commands to reuse it
561 // via a control socket.
562 let socket_path = temp_dir.path().join("ssh.sock");
563 let mut master_process = process::Command::new("ssh")
564 .stdin(Stdio::null())
565 .stdout(Stdio::piped())
566 .stderr(Stdio::piped())
567 .env("SSH_ASKPASS_REQUIRE", "force")
568 .env("SSH_ASKPASS", &askpass_script_path)
569 .args(["-N", "-o", "ControlMaster=yes", "-o"])
570 .arg(format!("ControlPath={}", socket_path.display()))
571 .arg(&url)
572 .spawn()?;
573
574 // Wait for this ssh process to close its stdout, indicating that authentication
575 // has completed.
576 let stdout = master_process.stdout.as_mut().unwrap();
577 let mut output = Vec::new();
578 stdout.read_to_end(&mut output).await?;
579 drop(askpass_task);
580
581 if master_process.try_status()?.is_some() {
582 output.clear();
583 let mut stderr = master_process.stderr.take().unwrap();
584 stderr.read_to_end(&mut output).await?;
585 Err(anyhow!(
586 "failed to connect: {}",
587 String::from_utf8_lossy(&output)
588 ))?;
589 }
590
591 Ok(Self {
592 socket: SshSocket {
593 connection_options,
594 socket_path,
595 },
596 master_process,
597 _temp_dir: temp_dir,
598 })
599 }
600
601 async fn ensure_server_binary(
602 &self,
603 delegate: &Arc<dyn SshClientDelegate>,
604 src_path: &Path,
605 dst_path: &Path,
606 version: SemanticVersion,
607 cx: &mut AsyncAppContext,
608 ) -> Result<()> {
609 let mut dst_path_gz = dst_path.to_path_buf();
610 dst_path_gz.set_extension("gz");
611
612 if let Some(parent) = dst_path.parent() {
613 run_cmd(self.socket.ssh_command("mkdir").arg("-p").arg(parent)).await?;
614 }
615
616 let mut server_binary_exists = false;
617 if cfg!(not(debug_assertions)) {
618 if let Ok(installed_version) =
619 run_cmd(self.socket.ssh_command(dst_path).arg("version")).await
620 {
621 if installed_version.trim() == version.to_string() {
622 server_binary_exists = true;
623 }
624 }
625 }
626
627 if server_binary_exists {
628 log::info!("remote development server already present",);
629 return Ok(());
630 }
631
632 let src_stat = fs::metadata(src_path).await?;
633 let size = src_stat.len();
634 let server_mode = 0o755;
635
636 let t0 = Instant::now();
637 delegate.set_status(Some("uploading remote development server"), cx);
638 log::info!("uploading remote development server ({}kb)", size / 1024);
639 self.upload_file(src_path, &dst_path_gz)
640 .await
641 .context("failed to upload server binary")?;
642 log::info!("uploaded remote development server in {:?}", t0.elapsed());
643
644 delegate.set_status(Some("extracting remote development server"), cx);
645 run_cmd(
646 self.socket
647 .ssh_command("gunzip")
648 .arg("--force")
649 .arg(&dst_path_gz),
650 )
651 .await?;
652
653 delegate.set_status(Some("unzipping remote development server"), cx);
654 run_cmd(
655 self.socket
656 .ssh_command("chmod")
657 .arg(format!("{:o}", server_mode))
658 .arg(dst_path),
659 )
660 .await?;
661
662 Ok(())
663 }
664
665 async fn query_platform(&self) -> Result<SshPlatform> {
666 let os = run_cmd(self.socket.ssh_command("uname").arg("-s")).await?;
667 let arch = run_cmd(self.socket.ssh_command("uname").arg("-m")).await?;
668
669 let os = match os.trim() {
670 "Darwin" => "macos",
671 "Linux" => "linux",
672 _ => Err(anyhow!("unknown uname os {os:?}"))?,
673 };
674 let arch = if arch.starts_with("arm") || arch.starts_with("aarch64") {
675 "aarch64"
676 } else if arch.starts_with("x86") || arch.starts_with("i686") {
677 "x86_64"
678 } else {
679 Err(anyhow!("unknown uname architecture {arch:?}"))?
680 };
681
682 Ok(SshPlatform { os, arch })
683 }
684
685 async fn upload_file(&self, src_path: &Path, dest_path: &Path) -> Result<()> {
686 let mut command = process::Command::new("scp");
687 let output = self
688 .socket
689 .ssh_options(&mut command)
690 .args(
691 self.socket
692 .connection_options
693 .port
694 .map(|port| vec!["-P".to_string(), port.to_string()])
695 .unwrap_or_default(),
696 )
697 .arg(src_path)
698 .arg(format!(
699 "{}:{}",
700 self.socket.connection_options.scp_url(),
701 dest_path.display()
702 ))
703 .output()
704 .await?;
705
706 if output.status.success() {
707 Ok(())
708 } else {
709 Err(anyhow!(
710 "failed to upload file {} -> {}: {}",
711 src_path.display(),
712 dest_path.display(),
713 String::from_utf8_lossy(&output.stderr)
714 ))
715 }
716 }
717}
718
719impl Drop for SshClientState {
720 fn drop(&mut self) {
721 if let Err(error) = self.master_process.kill() {
722 log::error!("failed to kill SSH master process: {}", error);
723 }
724 }
725}
726
727impl SshSocket {
728 fn ssh_command<S: AsRef<OsStr>>(&self, program: S) -> process::Command {
729 let mut command = process::Command::new("ssh");
730 self.ssh_options(&mut command)
731 .arg(self.connection_options.ssh_url())
732 .arg(program);
733 command
734 }
735
736 fn ssh_options<'a>(&self, command: &'a mut process::Command) -> &'a mut process::Command {
737 command
738 .stdin(Stdio::piped())
739 .stdout(Stdio::piped())
740 .stderr(Stdio::piped())
741 .args(["-o", "ControlMaster=no", "-o"])
742 .arg(format!("ControlPath={}", self.socket_path.display()))
743 }
744
745 fn ssh_args(&self) -> Vec<String> {
746 vec![
747 "-o".to_string(),
748 "ControlMaster=no".to_string(),
749 "-o".to_string(),
750 format!("ControlPath={}", self.socket_path.display()),
751 self.connection_options.ssh_url(),
752 ]
753 }
754}
755
756async fn run_cmd(command: &mut process::Command) -> Result<String> {
757 let output = command.output().await?;
758 if output.status.success() {
759 Ok(String::from_utf8_lossy(&output.stdout).to_string())
760 } else {
761 Err(anyhow!(
762 "failed to run command: {}",
763 String::from_utf8_lossy(&output.stderr)
764 ))
765 }
766}