transport.rs

  1use crate::{
  2    RemoteArch, RemoteOs, RemotePlatform,
  3    json_log::LogRecord,
  4    protocol::{MESSAGE_LEN_SIZE, message_len_from_buffer, read_message_with_len, write_message},
  5};
  6use anyhow::{Context as _, Result};
  7use futures::{
  8    AsyncReadExt as _, FutureExt as _, StreamExt as _,
  9    channel::mpsc::{Sender, UnboundedReceiver, UnboundedSender},
 10};
 11use gpui::{AppContext as _, AsyncApp, Task};
 12use rpc::proto::Envelope;
 13use smol::process::Child;
 14
 15pub mod docker;
 16#[cfg(any(test, feature = "test-support"))]
 17pub mod mock;
 18pub mod ssh;
 19pub mod wsl;
 20
 21/// Parses the output of `uname -sm` to determine the remote platform.
 22/// Takes the last line to skip possible shell initialization output.
 23fn parse_platform(output: &str) -> Result<RemotePlatform> {
 24    let output = output.trim();
 25    let uname = output.rsplit_once('\n').map_or(output, |(_, last)| last);
 26    let Some((os, arch)) = uname.split_once(" ") else {
 27        anyhow::bail!("unknown uname: {uname:?}")
 28    };
 29
 30    let os = match os {
 31        "Darwin" => RemoteOs::MacOs,
 32        "Linux" => RemoteOs::Linux,
 33        _ => anyhow::bail!(
 34            "Prebuilt remote servers are not yet available for {os:?}. See https://zed.dev/docs/remote-development"
 35        ),
 36    };
 37
 38    // exclude armv5,6,7 as they are 32-bit.
 39    let arch = if arch.starts_with("armv8")
 40        || arch.starts_with("armv9")
 41        || arch.starts_with("arm64")
 42        || arch.starts_with("aarch64")
 43    {
 44        RemoteArch::Aarch64
 45    } else if arch.starts_with("x86") {
 46        RemoteArch::X86_64
 47    } else {
 48        anyhow::bail!(
 49            "Prebuilt remote servers are not yet available for {arch:?}. See https://zed.dev/docs/remote-development"
 50        )
 51    };
 52
 53    Ok(RemotePlatform { os, arch })
 54}
 55
 56/// Parses the output of `echo $SHELL` to determine the remote shell.
 57/// Takes the last line to skip possible shell initialization output.
 58fn parse_shell(output: &str, fallback_shell: &str) -> String {
 59    let output = output.trim();
 60    let shell = output.rsplit_once('\n').map_or(output, |(_, last)| last);
 61    if shell.is_empty() {
 62        log::error!("$SHELL is not set, falling back to {fallback_shell}");
 63        fallback_shell.to_owned()
 64    } else {
 65        shell.to_owned()
 66    }
 67}
 68
 69fn handle_rpc_messages_over_child_process_stdio(
 70    mut remote_proxy_process: Child,
 71    incoming_tx: UnboundedSender<Envelope>,
 72    mut outgoing_rx: UnboundedReceiver<Envelope>,
 73    mut connection_activity_tx: Sender<()>,
 74    cx: &AsyncApp,
 75) -> Task<Result<i32>> {
 76    let mut child_stderr = remote_proxy_process.stderr.take().unwrap();
 77    let mut child_stdout = remote_proxy_process.stdout.take().unwrap();
 78    let mut child_stdin = remote_proxy_process.stdin.take().unwrap();
 79
 80    let mut stdin_buffer = Vec::new();
 81    let mut stdout_buffer = Vec::new();
 82    let mut stderr_buffer = Vec::new();
 83    let mut stderr_offset = 0;
 84
 85    let stdin_task = cx.background_spawn(async move {
 86        while let Some(outgoing) = outgoing_rx.next().await {
 87            write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
 88        }
 89        anyhow::Ok(())
 90    });
 91
 92    let stdout_task = cx.background_spawn({
 93        let mut connection_activity_tx = connection_activity_tx.clone();
 94        async move {
 95            loop {
 96                stdout_buffer.resize(MESSAGE_LEN_SIZE, 0);
 97                let len = child_stdout.read(&mut stdout_buffer).await?;
 98
 99                if len == 0 {
100                    return anyhow::Ok(());
101                }
102
103                if len < MESSAGE_LEN_SIZE {
104                    child_stdout.read_exact(&mut stdout_buffer[len..]).await?;
105                }
106
107                let message_len = message_len_from_buffer(&stdout_buffer);
108                let envelope =
109                    read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len)
110                        .await?;
111                connection_activity_tx.try_send(()).ok();
112                incoming_tx.unbounded_send(envelope).ok();
113            }
114        }
115    });
116
117    let stderr_task: Task<anyhow::Result<()>> = cx.background_spawn(async move {
118        loop {
119            stderr_buffer.resize(stderr_offset + 1024, 0);
120
121            let len = child_stderr
122                .read(&mut stderr_buffer[stderr_offset..])
123                .await?;
124            if len == 0 {
125                return anyhow::Ok(());
126            }
127
128            stderr_offset += len;
129            let mut start_ix = 0;
130            while let Some(ix) = stderr_buffer[start_ix..stderr_offset]
131                .iter()
132                .position(|b| b == &b'\n')
133            {
134                let line_ix = start_ix + ix;
135                let content = &stderr_buffer[start_ix..line_ix];
136                start_ix = line_ix + 1;
137                if let Ok(record) = serde_json::from_slice::<LogRecord>(content) {
138                    record.log(log::logger())
139                } else {
140                    eprintln!("(remote) {}", String::from_utf8_lossy(content));
141                }
142            }
143            stderr_buffer.drain(0..start_ix);
144            stderr_offset -= start_ix;
145
146            connection_activity_tx.try_send(()).ok();
147        }
148    });
149
150    cx.background_spawn(async move {
151        let result = futures::select! {
152            result = stdin_task.fuse() => {
153                result.context("stdin")
154            }
155            result = stdout_task.fuse() => {
156                result.context("stdout")
157            }
158            result = stderr_task.fuse() => {
159                result.context("stderr")
160            }
161        };
162        let exit_status = remote_proxy_process.status().await?;
163        let status = exit_status.code().unwrap_or_else(|| {
164            #[cfg(unix)]
165            let status = std::os::unix::process::ExitStatusExt::signal(&exit_status).unwrap_or(1);
166            #[cfg(not(unix))]
167            let status = 1;
168            status
169        });
170        match result {
171            Ok(_) => Ok(status),
172            Err(error) => Err(error),
173        }
174    })
175}
176
177#[cfg(any(debug_assertions, feature = "build-remote-server-binary"))]
178async fn build_remote_server_from_source(
179    platform: &crate::RemotePlatform,
180    delegate: &dyn crate::RemoteClientDelegate,
181    binary_exists_on_server: bool,
182    cx: &mut AsyncApp,
183) -> Result<Option<std::path::PathBuf>> {
184    use smol::process::{Command, Stdio};
185    use std::env::VarError;
186    use std::path::Path;
187    use util::command::new_smol_command;
188
189    if let Ok(path) = std::env::var("ZED_COPY_REMOTE_SERVER") {
190        let path = std::path::PathBuf::from(path);
191        if path.exists() {
192            return Ok(Some(path));
193        } else {
194            log::warn!(
195                "ZED_COPY_REMOTE_SERVER path does not exist, falling back to ZED_BUILD_REMOTE_SERVER: {}",
196                path.display()
197            );
198        }
199    }
200
201    // By default, we make building remote server from source opt-out and we do not force artifact compression
202    // for quicker builds.
203    let build_remote_server =
204        std::env::var("ZED_BUILD_REMOTE_SERVER").unwrap_or("nocompress".into());
205
206    if let "never" = &*build_remote_server {
207        return Ok(None);
208    } else if let "false" | "no" | "off" | "0" = &*build_remote_server {
209        if binary_exists_on_server {
210            return Ok(None);
211        }
212        log::warn!("ZED_BUILD_REMOTE_SERVER is disabled, but no server binary exists on the server")
213    }
214
215    async fn run_cmd(command: &mut Command) -> Result<()> {
216        let output = command
217            .kill_on_drop(true)
218            .stdout(Stdio::inherit())
219            .output()
220            .await?;
221        anyhow::ensure!(
222            output.status.success(),
223            "Failed to run command: {command:?}: output: {}",
224            String::from_utf8_lossy(&output.stderr)
225        );
226        Ok(())
227    }
228
229    let use_musl = !build_remote_server.contains("nomusl");
230    let triple = format!(
231        "{}-{}",
232        platform.arch,
233        match platform.os {
234            RemoteOs::Linux =>
235                if use_musl {
236                    "unknown-linux-musl"
237                } else {
238                    "unknown-linux-gnu"
239                },
240            RemoteOs::MacOs => "apple-darwin",
241            RemoteOs::Windows if cfg!(windows) => "pc-windows-msvc",
242            RemoteOs::Windows => "pc-windows-gnu",
243        }
244    );
245    let mut rust_flags = match std::env::var("RUSTFLAGS") {
246        Ok(val) => val,
247        Err(VarError::NotPresent) => String::new(),
248        Err(e) => {
249            log::error!("Failed to get env var `RUSTFLAGS` value: {e}");
250            String::new()
251        }
252    };
253    if platform.os == RemoteOs::Linux && use_musl {
254        rust_flags.push_str(" -C target-feature=+crt-static");
255
256        if let Ok(path) = std::env::var("ZED_ZSTD_MUSL_LIB") {
257            rust_flags.push_str(&format!(" -C link-arg=-L{path}"));
258        }
259    }
260    if build_remote_server.contains("mold") {
261        rust_flags.push_str(" -C link-arg=-fuse-ld=mold");
262    }
263
264    if platform.arch.as_str() == std::env::consts::ARCH
265        && platform.os.as_str() == std::env::consts::OS
266    {
267        delegate.set_status(Some("Building remote server binary from source"), cx);
268        log::info!("building remote server binary from source");
269        run_cmd(
270            new_smol_command("cargo")
271                .current_dir(concat!(env!("CARGO_MANIFEST_DIR"), "/../.."))
272                .args([
273                    "build",
274                    "--package",
275                    "remote_server",
276                    "--features",
277                    "debug-embed",
278                    "--target-dir",
279                    "target/remote_server",
280                    "--target",
281                    &triple,
282                ])
283                .env("RUSTFLAGS", &rust_flags),
284        )
285        .await?;
286    } else {
287        if which("zig", cx).await?.is_none() {
288            anyhow::bail!(if cfg!(not(windows)) {
289                "zig not found on $PATH, install zig (see https://ziglang.org/learn/getting-started or use zigup)"
290            } else {
291                "zig not found on $PATH, install zig (use `winget install -e --id zig.zig` or see https://ziglang.org/learn/getting-started or use zigup)"
292            });
293        }
294
295        let rustup = which("rustup", cx)
296            .await?
297            .context("rustup not found on $PATH, install rustup (see https://rustup.rs/)")?;
298        delegate.set_status(Some("Adding rustup target for cross-compilation"), cx);
299        log::info!("adding rustup target");
300        run_cmd(
301            new_smol_command(rustup)
302                .args(["target", "add"])
303                .arg(&triple),
304        )
305        .await?;
306
307        if which("cargo-zigbuild", cx).await?.is_none() {
308            delegate.set_status(Some("Installing cargo-zigbuild for cross-compilation"), cx);
309            log::info!("installing cargo-zigbuild");
310            run_cmd(new_smol_command("cargo").args(["install", "--locked", "cargo-zigbuild"]))
311                .await?;
312        }
313
314        delegate.set_status(
315            Some(&format!(
316                "Building remote binary from source for {triple} with Zig"
317            )),
318            cx,
319        );
320        log::info!("building remote binary from source for {triple} with Zig");
321        run_cmd(
322            new_smol_command("cargo")
323                .args([
324                    "zigbuild",
325                    "--package",
326                    "remote_server",
327                    "--features",
328                    "debug-embed",
329                    "--target-dir",
330                    "target/remote_server",
331                    "--target",
332                    &triple,
333                ])
334                .env("RUSTFLAGS", &rust_flags),
335        )
336        .await?;
337    };
338    let bin_path = Path::new("target")
339        .join("remote_server")
340        .join(&triple)
341        .join("debug")
342        .join("remote_server")
343        .with_extension(if platform.os.is_windows() { "exe" } else { "" });
344
345    let path = if !build_remote_server.contains("nocompress") {
346        delegate.set_status(Some("Compressing binary"), cx);
347
348        #[cfg(not(target_os = "windows"))]
349        {
350            run_cmd(new_smol_command("gzip").args(["-f", &bin_path.to_string_lossy()])).await?;
351        }
352
353        #[cfg(target_os = "windows")]
354        {
355            // On Windows, we use 7z to compress the binary
356
357            let seven_zip = which("7z.exe",cx)
358                .await?
359                .context("7z.exe not found on $PATH, install it (e.g. with `winget install -e --id 7zip.7zip`) or, if you don't want this behaviour, set $env:ZED_BUILD_REMOTE_SERVER=\"nocompress\"")?;
360            let gz_path = format!("target/remote_server/{}/debug/remote_server.gz", triple);
361            if smol::fs::metadata(&gz_path).await.is_ok() {
362                smol::fs::remove_file(&gz_path).await?;
363            }
364            run_cmd(new_smol_command(seven_zip).args([
365                "a",
366                "-tgzip",
367                &gz_path,
368                &bin_path.to_string_lossy(),
369            ]))
370            .await?;
371        }
372
373        let mut archive_path = bin_path;
374        archive_path.set_extension("gz");
375        std::env::current_dir()?.join(archive_path)
376    } else {
377        bin_path
378    };
379
380    Ok(Some(path))
381}
382
383#[cfg(any(debug_assertions, feature = "build-remote-server-binary"))]
384async fn which(
385    binary_name: impl AsRef<str>,
386    cx: &mut AsyncApp,
387) -> Result<Option<std::path::PathBuf>> {
388    let binary_name = binary_name.as_ref().to_string();
389    let binary_name_cloned = binary_name.clone();
390    let res = cx
391        .background_spawn(async move { which::which(binary_name_cloned) })
392        .await;
393    match res {
394        Ok(path) => Ok(Some(path)),
395        Err(which::Error::CannotFindBinaryPath) => Ok(None),
396        Err(err) => Err(anyhow::anyhow!(
397            "Failed to run 'which' to find the binary '{binary_name}': {err}"
398        )),
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405
406    #[test]
407    fn test_parse_platform() {
408        let result = parse_platform("Linux x86_64\n").unwrap();
409        assert_eq!(result.os, RemoteOs::Linux);
410        assert_eq!(result.arch, RemoteArch::X86_64);
411
412        let result = parse_platform("Darwin arm64\n").unwrap();
413        assert_eq!(result.os, RemoteOs::MacOs);
414        assert_eq!(result.arch, RemoteArch::Aarch64);
415
416        let result = parse_platform("Linux x86_64").unwrap();
417        assert_eq!(result.os, RemoteOs::Linux);
418        assert_eq!(result.arch, RemoteArch::X86_64);
419
420        let result = parse_platform("some shell init output\nLinux aarch64\n").unwrap();
421        assert_eq!(result.os, RemoteOs::Linux);
422        assert_eq!(result.arch, RemoteArch::Aarch64);
423
424        let result = parse_platform("some shell init output\nLinux aarch64").unwrap();
425        assert_eq!(result.os, RemoteOs::Linux);
426        assert_eq!(result.arch, RemoteArch::Aarch64);
427
428        assert_eq!(
429            parse_platform("Linux armv8l\n").unwrap().arch,
430            RemoteArch::Aarch64
431        );
432        assert_eq!(
433            parse_platform("Linux aarch64\n").unwrap().arch,
434            RemoteArch::Aarch64
435        );
436        assert_eq!(
437            parse_platform("Linux x86_64\n").unwrap().arch,
438            RemoteArch::X86_64
439        );
440
441        let result = parse_platform(
442            r#"Linux x86_64 - What you're referring to as Linux, is in fact, GNU/Linux...\n"#,
443        )
444        .unwrap();
445        assert_eq!(result.os, RemoteOs::Linux);
446        assert_eq!(result.arch, RemoteArch::X86_64);
447
448        assert!(parse_platform("Windows x86_64\n").is_err());
449        assert!(parse_platform("Linux armv7l\n").is_err());
450    }
451
452    #[test]
453    fn test_parse_shell() {
454        assert_eq!(parse_shell("/bin/bash\n", "sh"), "/bin/bash");
455        assert_eq!(parse_shell("/bin/zsh\n", "sh"), "/bin/zsh");
456
457        assert_eq!(parse_shell("/bin/bash", "sh"), "/bin/bash");
458        assert_eq!(
459            parse_shell("some shell init output\n/bin/bash\n", "sh"),
460            "/bin/bash"
461        );
462        assert_eq!(
463            parse_shell("some shell init output\n/bin/bash", "sh"),
464            "/bin/bash"
465        );
466        assert_eq!(parse_shell("", "sh"), "sh");
467        assert_eq!(parse_shell("\n", "sh"), "sh");
468    }
469}