ssh_session.rs

  1use crate::{
  2    json_log::LogRecord,
  3    protocol::{
  4        message_len_from_buffer, read_message_with_len, write_message, MessageId, MESSAGE_LEN_SIZE,
  5    },
  6};
  7use anyhow::{anyhow, Context as _, Result};
  8use collections::HashMap;
  9use futures::{
 10    channel::{mpsc, oneshot},
 11    future::{BoxFuture, LocalBoxFuture},
 12    select_biased, AsyncReadExt as _, AsyncWriteExt as _, Future, FutureExt as _, StreamExt as _,
 13};
 14use gpui::{AppContext, AsyncAppContext, Model, SemanticVersion, WeakModel};
 15use parking_lot::Mutex;
 16use rpc::{
 17    proto::{
 18        self, build_typed_envelope, AnyTypedEnvelope, Envelope, EnvelopedMessage, PeerId,
 19        ProtoClient, RequestMessage,
 20    },
 21    TypedEnvelope,
 22};
 23use smol::{
 24    fs,
 25    process::{self, Stdio},
 26};
 27use std::{
 28    any::TypeId,
 29    ffi::OsStr,
 30    path::{Path, PathBuf},
 31    sync::{
 32        atomic::{AtomicU32, Ordering::SeqCst},
 33        Arc,
 34    },
 35    time::Instant,
 36};
 37use tempfile::TempDir;
 38
 39#[derive(Clone)]
 40pub struct SshSocket {
 41    connection_options: SshConnectionOptions,
 42    socket_path: PathBuf,
 43}
 44
 45pub struct SshSession {
 46    next_message_id: AtomicU32,
 47    response_channels: ResponseChannels,
 48    outgoing_tx: mpsc::UnboundedSender<Envelope>,
 49    spawn_process_tx: mpsc::UnboundedSender<SpawnRequest>,
 50    client_socket: Option<SshSocket>,
 51    message_handlers: Mutex<
 52        HashMap<
 53            TypeId,
 54            Arc<
 55                dyn Send
 56                    + Sync
 57                    + Fn(
 58                        Box<dyn AnyTypedEnvelope>,
 59                        Arc<SshSession>,
 60                        AsyncAppContext,
 61                    ) -> Option<LocalBoxFuture<'static, Result<()>>>,
 62            >,
 63        >,
 64    >,
 65}
 66
 67struct SshClientState {
 68    socket: SshSocket,
 69    _master_process: process::Child,
 70    _temp_dir: TempDir,
 71}
 72
 73#[derive(Debug, Clone, PartialEq, Eq)]
 74pub struct SshConnectionOptions {
 75    pub host: String,
 76    pub username: Option<String>,
 77    pub port: Option<u16>,
 78    pub password: Option<String>,
 79}
 80
 81impl SshConnectionOptions {
 82    pub fn ssh_url(&self) -> String {
 83        let mut result = String::from("ssh://");
 84        if let Some(username) = &self.username {
 85            result.push_str(username);
 86            result.push('@');
 87        }
 88        result.push_str(&self.host);
 89        if let Some(port) = self.port {
 90            result.push(':');
 91            result.push_str(&port.to_string());
 92        }
 93        result
 94    }
 95
 96    fn scp_url(&self) -> String {
 97        if let Some(username) = &self.username {
 98            format!("{}@{}", username, self.host)
 99        } else {
100            self.host.clone()
101        }
102    }
103
104    pub fn connection_string(&self) -> String {
105        let host = if let Some(username) = &self.username {
106            format!("{}@{}", username, self.host)
107        } else {
108            self.host.clone()
109        };
110        if let Some(port) = &self.port {
111            format!("{}:{}", host, port)
112        } else {
113            host
114        }
115    }
116}
117
118struct SpawnRequest {
119    command: String,
120    process_tx: oneshot::Sender<process::Child>,
121}
122
123#[derive(Copy, Clone, Debug)]
124pub struct SshPlatform {
125    pub os: &'static str,
126    pub arch: &'static str,
127}
128
129pub trait SshClientDelegate {
130    fn ask_password(
131        &self,
132        prompt: String,
133        cx: &mut AsyncAppContext,
134    ) -> oneshot::Receiver<Result<String>>;
135    fn remote_server_binary_path(&self, cx: &mut AsyncAppContext) -> Result<PathBuf>;
136    fn get_server_binary(
137        &self,
138        platform: SshPlatform,
139        cx: &mut AsyncAppContext,
140    ) -> oneshot::Receiver<Result<(PathBuf, SemanticVersion)>>;
141    fn set_status(&self, status: Option<&str>, cx: &mut AsyncAppContext);
142}
143
144type ResponseChannels = Mutex<HashMap<MessageId, oneshot::Sender<(Envelope, oneshot::Sender<()>)>>>;
145
146impl SshSession {
147    pub async fn client(
148        connection_options: SshConnectionOptions,
149        delegate: Arc<dyn SshClientDelegate>,
150        cx: &mut AsyncAppContext,
151    ) -> Result<Arc<Self>> {
152        let client_state = SshClientState::new(connection_options, delegate.clone(), cx).await?;
153
154        let platform = client_state.query_platform().await?;
155        let (local_binary_path, version) = delegate.get_server_binary(platform, cx).await??;
156        let remote_binary_path = delegate.remote_server_binary_path(cx)?;
157        client_state
158            .ensure_server_binary(
159                &delegate,
160                &local_binary_path,
161                &remote_binary_path,
162                version,
163                cx,
164            )
165            .await?;
166
167        let (spawn_process_tx, mut spawn_process_rx) = mpsc::unbounded::<SpawnRequest>();
168        let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded::<Envelope>();
169        let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
170
171        let socket = client_state.socket.clone();
172        run_cmd(socket.ssh_command(&remote_binary_path).arg("version")).await?;
173
174        let mut remote_server_child = socket
175            .ssh_command(&format!(
176                "RUST_LOG={} {:?} run",
177                std::env::var("RUST_LOG").unwrap_or(String::new()),
178                remote_binary_path,
179            ))
180            .spawn()
181            .context("failed to spawn remote server")?;
182        let mut child_stderr = remote_server_child.stderr.take().unwrap();
183        let mut child_stdout = remote_server_child.stdout.take().unwrap();
184        let mut child_stdin = remote_server_child.stdin.take().unwrap();
185
186        let executor = cx.background_executor().clone();
187        executor.clone().spawn(async move {
188            let mut stdin_buffer = Vec::new();
189            let mut stdout_buffer = Vec::new();
190            let mut stderr_buffer = Vec::new();
191            let mut stderr_offset = 0;
192
193            loop {
194                stdout_buffer.resize(MESSAGE_LEN_SIZE, 0);
195                stderr_buffer.resize(stderr_offset + 1024, 0);
196
197                select_biased! {
198                    outgoing = outgoing_rx.next().fuse() => {
199                        let Some(outgoing) = outgoing else {
200                            return anyhow::Ok(());
201                        };
202
203                        write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
204                    }
205
206                    request = spawn_process_rx.next().fuse() => {
207                        let Some(request) = request else {
208                            return Ok(());
209                        };
210
211                        log::info!("spawn process: {:?}", request.command);
212                        let child = client_state.socket
213                            .ssh_command(&request.command)
214                            .spawn()
215                            .context("failed to create channel")?;
216                        request.process_tx.send(child).ok();
217                    }
218
219                    result = child_stdout.read(&mut stdout_buffer).fuse() => {
220                        match result {
221                            Ok(len) => {
222                                if len == 0 {
223                                    child_stdin.close().await?;
224                                    let status = remote_server_child.status().await?;
225                                    if !status.success() {
226                                        log::info!("channel exited with status: {status:?}");
227                                    }
228                                    return Ok(());
229                                }
230
231                                if len < stdout_buffer.len() {
232                                    child_stdout.read_exact(&mut stdout_buffer[len..]).await?;
233                                }
234
235                                let message_len = message_len_from_buffer(&stdout_buffer);
236                                match read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len).await {
237                                    Ok(envelope) => {
238                                        incoming_tx.unbounded_send(envelope).ok();
239                                    }
240                                    Err(error) => {
241                                        log::error!("error decoding message {error:?}");
242                                    }
243                                }
244                            }
245                            Err(error) => {
246                                Err(anyhow!("error reading stdout: {error:?}"))?;
247                            }
248                        }
249                    }
250
251                    result = child_stderr.read(&mut stderr_buffer[stderr_offset..]).fuse() => {
252                        match result {
253                            Ok(len) => {
254                                stderr_offset += len;
255                                let mut start_ix = 0;
256                                while let Some(ix) = stderr_buffer[start_ix..stderr_offset].iter().position(|b| b == &b'\n') {
257                                    let line_ix = start_ix + ix;
258                                    let content = &stderr_buffer[start_ix..line_ix];
259                                    start_ix = line_ix + 1;
260                                    if let Ok(record) = serde_json::from_slice::<LogRecord>(&content) {
261                                        record.log(log::logger())
262                                    } else {
263                                        eprintln!("(remote) {}", String::from_utf8_lossy(content));
264                                    }
265                                }
266                                stderr_buffer.drain(0..start_ix);
267                                stderr_offset -= start_ix;
268                            }
269                            Err(error) => {
270                                Err(anyhow!("error reading stderr: {error:?}"))?;
271                            }
272                        }
273                    }
274                }
275            }
276        }).detach();
277
278        cx.update(|cx| Self::new(incoming_rx, outgoing_tx, spawn_process_tx, Some(socket), cx))
279    }
280
281    pub fn server(
282        incoming_rx: mpsc::UnboundedReceiver<Envelope>,
283        outgoing_tx: mpsc::UnboundedSender<Envelope>,
284        cx: &AppContext,
285    ) -> Arc<SshSession> {
286        let (tx, _rx) = mpsc::unbounded();
287        Self::new(incoming_rx, outgoing_tx, tx, None, cx)
288    }
289
290    #[cfg(any(test, feature = "test-support"))]
291    pub fn fake(
292        client_cx: &mut gpui::TestAppContext,
293        server_cx: &mut gpui::TestAppContext,
294    ) -> (Arc<Self>, Arc<Self>) {
295        let (server_to_client_tx, server_to_client_rx) = mpsc::unbounded();
296        let (client_to_server_tx, client_to_server_rx) = mpsc::unbounded();
297        let (tx, _rx) = mpsc::unbounded();
298        (
299            client_cx.update(|cx| {
300                Self::new(
301                    server_to_client_rx,
302                    client_to_server_tx,
303                    tx.clone(),
304                    None, // todo()
305                    cx,
306                )
307            }),
308            server_cx.update(|cx| {
309                Self::new(
310                    client_to_server_rx,
311                    server_to_client_tx,
312                    tx.clone(),
313                    None,
314                    cx,
315                )
316            }),
317        )
318    }
319
320    fn new(
321        mut incoming_rx: mpsc::UnboundedReceiver<Envelope>,
322        outgoing_tx: mpsc::UnboundedSender<Envelope>,
323        spawn_process_tx: mpsc::UnboundedSender<SpawnRequest>,
324        client_socket: Option<SshSocket>,
325        cx: &AppContext,
326    ) -> Arc<SshSession> {
327        let this = Arc::new(Self {
328            next_message_id: AtomicU32::new(0),
329            response_channels: ResponseChannels::default(),
330            outgoing_tx,
331            spawn_process_tx,
332            client_socket,
333            message_handlers: Default::default(),
334        });
335
336        cx.spawn(|cx| {
337            let this = this.clone();
338            async move {
339                let peer_id = PeerId { owner_id: 0, id: 0 };
340                while let Some(incoming) = incoming_rx.next().await {
341                    if let Some(request_id) = incoming.responding_to {
342                        let request_id = MessageId(request_id);
343                        let sender = this.response_channels.lock().remove(&request_id);
344                        if let Some(sender) = sender {
345                            let (tx, rx) = oneshot::channel();
346                            if incoming.payload.is_some() {
347                                sender.send((incoming, tx)).ok();
348                            }
349                            rx.await.ok();
350                        }
351                    } else if let Some(envelope) =
352                        build_typed_envelope(peer_id, Instant::now(), incoming)
353                    {
354                        log::debug!(
355                            "ssh message received. name:{}",
356                            envelope.payload_type_name()
357                        );
358                        let type_id = envelope.payload_type_id();
359                        let handler = this.message_handlers.lock().get(&type_id).cloned();
360                        if let Some(handler) = handler {
361                            if let Some(future) = handler(envelope, this.clone(), cx.clone()) {
362                                future.await.ok();
363                            } else {
364                                this.message_handlers.lock().remove(&type_id);
365                            }
366                        }
367                    }
368                }
369                anyhow::Ok(())
370            }
371        })
372        .detach();
373
374        this
375    }
376
377    pub fn request<T: RequestMessage>(
378        &self,
379        payload: T,
380    ) -> impl 'static + Future<Output = Result<T::Response>> {
381        log::debug!("ssh request start. name:{}", T::NAME);
382        let response = self.request_dynamic(payload.into_envelope(0, None, None), "");
383        async move {
384            let response = response.await?;
385            log::debug!("ssh request finish. name:{}", T::NAME);
386            T::Response::from_envelope(response)
387                .ok_or_else(|| anyhow!("received a response of the wrong type"))
388        }
389    }
390
391    pub fn send<T: EnvelopedMessage>(&self, payload: T) -> Result<()> {
392        self.send_dynamic(payload.into_envelope(0, None, None))
393    }
394
395    pub fn request_dynamic(
396        &self,
397        mut envelope: proto::Envelope,
398        _request_type: &'static str,
399    ) -> impl 'static + Future<Output = Result<proto::Envelope>> {
400        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
401        let (tx, rx) = oneshot::channel();
402        self.response_channels
403            .lock()
404            .insert(MessageId(envelope.id), tx);
405        self.outgoing_tx.unbounded_send(envelope).ok();
406        async move { Ok(rx.await.context("connection lost")?.0) }
407    }
408
409    pub fn send_dynamic(&self, mut envelope: proto::Envelope) -> Result<()> {
410        envelope.id = self.next_message_id.fetch_add(1, SeqCst);
411        self.outgoing_tx.unbounded_send(envelope)?;
412        Ok(())
413    }
414
415    pub async fn spawn_process(&self, command: String) -> process::Child {
416        let (process_tx, process_rx) = oneshot::channel();
417        self.spawn_process_tx
418            .unbounded_send(SpawnRequest {
419                command,
420                process_tx,
421            })
422            .ok();
423        process_rx.await.unwrap()
424    }
425
426    pub fn ssh_args(&self) -> Vec<String> {
427        self.client_socket.as_ref().unwrap().ssh_args()
428    }
429
430    pub fn add_message_handler<M, E, H, F>(&self, entity: WeakModel<E>, handler: H)
431    where
432        M: EnvelopedMessage,
433        E: 'static,
434        H: 'static + Sync + Send + Fn(Model<E>, TypedEnvelope<M>, AsyncAppContext) -> F,
435        F: 'static + Future<Output = Result<()>>,
436    {
437        let message_type_id = TypeId::of::<M>();
438        self.message_handlers.lock().insert(
439            message_type_id,
440            Arc::new(move |envelope, _, cx| {
441                let entity = entity.upgrade()?;
442                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
443                Some(handler(entity, *envelope, cx).boxed_local())
444            }),
445        );
446    }
447
448    pub fn add_request_handler<M, E, H, F>(&self, entity: WeakModel<E>, handler: H)
449    where
450        M: EnvelopedMessage + RequestMessage,
451        E: 'static,
452        H: 'static + Sync + Send + Fn(Model<E>, TypedEnvelope<M>, AsyncAppContext) -> F,
453        F: 'static + Future<Output = Result<M::Response>>,
454    {
455        let message_type_id = TypeId::of::<M>();
456        self.message_handlers.lock().insert(
457            message_type_id,
458            Arc::new(move |envelope, this, cx| {
459                let entity = entity.upgrade()?;
460                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
461                let request_id = envelope.message_id();
462                Some(
463                    handler(entity, *envelope, cx)
464                        .then(move |result| async move {
465                            this.outgoing_tx.unbounded_send(result?.into_envelope(
466                                this.next_message_id.fetch_add(1, SeqCst),
467                                Some(request_id),
468                                None,
469                            ))?;
470                            Ok(())
471                        })
472                        .boxed_local(),
473                )
474            }),
475        );
476    }
477}
478
479impl ProtoClient for SshSession {
480    fn request(
481        &self,
482        envelope: proto::Envelope,
483        request_type: &'static str,
484    ) -> BoxFuture<'static, Result<proto::Envelope>> {
485        self.request_dynamic(envelope, request_type).boxed()
486    }
487
488    fn send(&self, envelope: proto::Envelope) -> Result<()> {
489        self.send_dynamic(envelope)
490    }
491}
492
493impl SshClientState {
494    #[cfg(not(unix))]
495    async fn new(
496        _connection_options: SshConnectionOptions,
497        _delegate: Arc<dyn SshClientDelegate>,
498        _cx: &mut AsyncAppContext,
499    ) -> Result<Self> {
500        Err(anyhow!("ssh is not supported on this platform"))
501    }
502
503    #[cfg(unix)]
504    async fn new(
505        connection_options: SshConnectionOptions,
506        delegate: Arc<dyn SshClientDelegate>,
507        cx: &mut AsyncAppContext,
508    ) -> Result<Self> {
509        use futures::{io::BufReader, AsyncBufReadExt as _};
510        use smol::{fs::unix::PermissionsExt as _, net::unix::UnixListener};
511        use util::ResultExt as _;
512
513        delegate.set_status(Some("connecting"), cx);
514
515        let url = connection_options.ssh_url();
516        let temp_dir = tempfile::Builder::new()
517            .prefix("zed-ssh-session")
518            .tempdir()?;
519
520        // Create a domain socket listener to handle requests from the askpass program.
521        let askpass_socket = temp_dir.path().join("askpass.sock");
522        let listener =
523            UnixListener::bind(&askpass_socket).context("failed to create askpass socket")?;
524
525        let askpass_task = cx.spawn(|mut cx| async move {
526            while let Ok((mut stream, _)) = listener.accept().await {
527                let mut buffer = Vec::new();
528                let mut reader = BufReader::new(&mut stream);
529                if reader.read_until(b'\0', &mut buffer).await.is_err() {
530                    buffer.clear();
531                }
532                let password_prompt = String::from_utf8_lossy(&buffer);
533                if let Some(password) = delegate
534                    .ask_password(password_prompt.to_string(), &mut cx)
535                    .await
536                    .context("failed to get ssh password")
537                    .and_then(|p| p)
538                    .log_err()
539                {
540                    stream.write_all(password.as_bytes()).await.log_err();
541                }
542            }
543        });
544
545        // Create an askpass script that communicates back to this process.
546        let askpass_script = format!(
547            "{shebang}\n{print_args} | nc -U {askpass_socket} 2> /dev/null \n",
548            askpass_socket = askpass_socket.display(),
549            print_args = "printf '%s\\0' \"$@\"",
550            shebang = "#!/bin/sh",
551        );
552        let askpass_script_path = temp_dir.path().join("askpass.sh");
553        fs::write(&askpass_script_path, askpass_script).await?;
554        fs::set_permissions(&askpass_script_path, std::fs::Permissions::from_mode(0o755)).await?;
555
556        // Start the master SSH process, which does not do anything except for establish
557        // the connection and keep it open, allowing other ssh commands to reuse it
558        // via a control socket.
559        let socket_path = temp_dir.path().join("ssh.sock");
560        let mut master_process = process::Command::new("ssh")
561            .stdin(Stdio::null())
562            .stdout(Stdio::piped())
563            .stderr(Stdio::piped())
564            .env("SSH_ASKPASS_REQUIRE", "force")
565            .env("SSH_ASKPASS", &askpass_script_path)
566            .args(["-N", "-o", "ControlMaster=yes", "-o"])
567            .arg(format!("ControlPath={}", socket_path.display()))
568            .arg(&url)
569            .spawn()?;
570
571        // Wait for this ssh process to close its stdout, indicating that authentication
572        // has completed.
573        let stdout = master_process.stdout.as_mut().unwrap();
574        let mut output = Vec::new();
575        stdout.read_to_end(&mut output).await?;
576        drop(askpass_task);
577
578        if master_process.try_status()?.is_some() {
579            output.clear();
580            let mut stderr = master_process.stderr.take().unwrap();
581            stderr.read_to_end(&mut output).await?;
582            Err(anyhow!(
583                "failed to connect: {}",
584                String::from_utf8_lossy(&output)
585            ))?;
586        }
587
588        Ok(Self {
589            socket: SshSocket {
590                connection_options,
591                socket_path,
592            },
593            _master_process: master_process,
594            _temp_dir: temp_dir,
595        })
596    }
597
598    async fn ensure_server_binary(
599        &self,
600        delegate: &Arc<dyn SshClientDelegate>,
601        src_path: &Path,
602        dst_path: &Path,
603        version: SemanticVersion,
604        cx: &mut AsyncAppContext,
605    ) -> Result<()> {
606        let mut dst_path_gz = dst_path.to_path_buf();
607        dst_path_gz.set_extension("gz");
608
609        if let Some(parent) = dst_path.parent() {
610            run_cmd(self.socket.ssh_command("mkdir").arg("-p").arg(parent)).await?;
611        }
612
613        let mut server_binary_exists = false;
614        if cfg!(not(debug_assertions)) {
615            if let Ok(installed_version) =
616                run_cmd(self.socket.ssh_command(&dst_path).arg("version")).await
617            {
618                if installed_version.trim() == version.to_string() {
619                    server_binary_exists = true;
620                }
621            }
622        }
623
624        if server_binary_exists {
625            log::info!("remote development server already present",);
626            return Ok(());
627        }
628
629        let src_stat = fs::metadata(src_path).await?;
630        let size = src_stat.len();
631        let server_mode = 0o755;
632
633        let t0 = Instant::now();
634        delegate.set_status(Some("uploading remote development server"), cx);
635        log::info!("uploading remote development server ({}kb)", size / 1024);
636        self.upload_file(src_path, &dst_path_gz)
637            .await
638            .context("failed to upload server binary")?;
639        log::info!("uploaded remote development server in {:?}", t0.elapsed());
640
641        delegate.set_status(Some("extracting remote development server"), cx);
642        run_cmd(
643            self.socket
644                .ssh_command("gunzip")
645                .arg("--force")
646                .arg(&dst_path_gz),
647        )
648        .await?;
649
650        delegate.set_status(Some("unzipping remote development server"), cx);
651        run_cmd(
652            self.socket
653                .ssh_command("chmod")
654                .arg(format!("{:o}", server_mode))
655                .arg(&dst_path),
656        )
657        .await?;
658
659        Ok(())
660    }
661
662    async fn query_platform(&self) -> Result<SshPlatform> {
663        let os = run_cmd(self.socket.ssh_command("uname").arg("-s")).await?;
664        let arch = run_cmd(self.socket.ssh_command("uname").arg("-m")).await?;
665
666        let os = match os.trim() {
667            "Darwin" => "macos",
668            "Linux" => "linux",
669            _ => Err(anyhow!("unknown uname os {os:?}"))?,
670        };
671        let arch = if arch.starts_with("arm") || arch.starts_with("aarch64") {
672            "aarch64"
673        } else if arch.starts_with("x86") || arch.starts_with("i686") {
674            "x86_64"
675        } else {
676            Err(anyhow!("unknown uname architecture {arch:?}"))?
677        };
678
679        Ok(SshPlatform { os, arch })
680    }
681
682    async fn upload_file(&self, src_path: &Path, dest_path: &Path) -> Result<()> {
683        let mut command = process::Command::new("scp");
684        let output = self
685            .socket
686            .ssh_options(&mut command)
687            .args(
688                self.socket
689                    .connection_options
690                    .port
691                    .map(|port| vec!["-P".to_string(), port.to_string()])
692                    .unwrap_or_default(),
693            )
694            .arg(&src_path)
695            .arg(&format!(
696                "{}:{}",
697                self.socket.connection_options.scp_url(),
698                dest_path.display()
699            ))
700            .output()
701            .await?;
702
703        if output.status.success() {
704            Ok(())
705        } else {
706            Err(anyhow!(
707                "failed to upload file {} -> {}: {}",
708                src_path.display(),
709                dest_path.display(),
710                String::from_utf8_lossy(&output.stderr)
711            ))
712        }
713    }
714}
715
716impl SshSocket {
717    fn ssh_command<S: AsRef<OsStr>>(&self, program: S) -> process::Command {
718        let mut command = process::Command::new("ssh");
719        self.ssh_options(&mut command)
720            .arg(self.connection_options.ssh_url())
721            .arg(program);
722        command
723    }
724
725    fn ssh_options<'a>(&self, command: &'a mut process::Command) -> &'a mut process::Command {
726        command
727            .stdin(Stdio::piped())
728            .stdout(Stdio::piped())
729            .stderr(Stdio::piped())
730            .args(["-o", "ControlMaster=no", "-o"])
731            .arg(format!("ControlPath={}", self.socket_path.display()))
732    }
733
734    fn ssh_args(&self) -> Vec<String> {
735        vec![
736            "-o".to_string(),
737            "ControlMaster=no".to_string(),
738            "-o".to_string(),
739            format!("ControlPath={}", self.socket_path.display()),
740            self.connection_options.ssh_url(),
741        ]
742    }
743}
744
745async fn run_cmd(command: &mut process::Command) -> Result<String> {
746    let output = command.output().await?;
747    if output.status.success() {
748        Ok(String::from_utf8_lossy(&output.stdout).to_string())
749    } else {
750        Err(anyhow!(
751            "failed to run command: {}",
752            String::from_utf8_lossy(&output.stderr)
753        ))
754    }
755}