transport.rs

  1use crate::{
  2    RemoteArch, RemoteOs, RemotePlatform,
  3    json_log::LogRecord,
  4    protocol::{MESSAGE_LEN_SIZE, message_len_from_buffer, read_message_with_len, write_message},
  5};
  6use anyhow::{Context as _, Result};
  7use futures::{
  8    AsyncReadExt as _, FutureExt as _, StreamExt as _,
  9    channel::mpsc::{Sender, UnboundedReceiver, UnboundedSender},
 10};
 11use gpui::{AppContext as _, AsyncApp, Task};
 12use rpc::proto::Envelope;
 13use smol::process::Child;
 14
 15pub mod docker;
 16pub mod ssh;
 17pub mod wsl;
 18
 19/// Parses the output of `uname -sm` to determine the remote platform.
 20/// Takes the last line to skip possible shell initialization output.
 21fn parse_platform(output: &str) -> Result<RemotePlatform> {
 22    let output = output.trim();
 23    let uname = output.rsplit_once('\n').map_or(output, |(_, last)| last);
 24    let Some((os, arch)) = uname.split_once(" ") else {
 25        anyhow::bail!("unknown uname: {uname:?}")
 26    };
 27
 28    let os = match os {
 29        "Darwin" => RemoteOs::MacOs,
 30        "Linux" => RemoteOs::Linux,
 31        _ => anyhow::bail!(
 32            "Prebuilt remote servers are not yet available for {os:?}. See https://zed.dev/docs/remote-development"
 33        ),
 34    };
 35
 36    // exclude armv5,6,7 as they are 32-bit.
 37    let arch = if arch.starts_with("armv8")
 38        || arch.starts_with("armv9")
 39        || arch.starts_with("arm64")
 40        || arch.starts_with("aarch64")
 41    {
 42        RemoteArch::Aarch64
 43    } else if arch.starts_with("x86") {
 44        RemoteArch::X86_64
 45    } else {
 46        anyhow::bail!(
 47            "Prebuilt remote servers are not yet available for {arch:?}. See https://zed.dev/docs/remote-development"
 48        )
 49    };
 50
 51    Ok(RemotePlatform { os, arch })
 52}
 53
 54/// Parses the output of `echo $SHELL` to determine the remote shell.
 55/// Takes the last line to skip possible shell initialization output.
 56fn parse_shell(output: &str, fallback_shell: &str) -> String {
 57    let output = output.trim();
 58    let shell = output.rsplit_once('\n').map_or(output, |(_, last)| last);
 59    if shell.is_empty() {
 60        log::error!("$SHELL is not set, falling back to {fallback_shell}");
 61        fallback_shell.to_owned()
 62    } else {
 63        shell.to_owned()
 64    }
 65}
 66
 67fn handle_rpc_messages_over_child_process_stdio(
 68    mut remote_proxy_process: Child,
 69    incoming_tx: UnboundedSender<Envelope>,
 70    mut outgoing_rx: UnboundedReceiver<Envelope>,
 71    mut connection_activity_tx: Sender<()>,
 72    cx: &AsyncApp,
 73) -> Task<Result<i32>> {
 74    let mut child_stderr = remote_proxy_process.stderr.take().unwrap();
 75    let mut child_stdout = remote_proxy_process.stdout.take().unwrap();
 76    let mut child_stdin = remote_proxy_process.stdin.take().unwrap();
 77
 78    let mut stdin_buffer = Vec::new();
 79    let mut stdout_buffer = Vec::new();
 80    let mut stderr_buffer = Vec::new();
 81    let mut stderr_offset = 0;
 82
 83    let stdin_task = cx.background_spawn(async move {
 84        while let Some(outgoing) = outgoing_rx.next().await {
 85            write_message(&mut child_stdin, &mut stdin_buffer, outgoing).await?;
 86        }
 87        anyhow::Ok(())
 88    });
 89
 90    let stdout_task = cx.background_spawn({
 91        let mut connection_activity_tx = connection_activity_tx.clone();
 92        async move {
 93            loop {
 94                stdout_buffer.resize(MESSAGE_LEN_SIZE, 0);
 95                let len = child_stdout.read(&mut stdout_buffer).await?;
 96
 97                if len == 0 {
 98                    return anyhow::Ok(());
 99                }
100
101                if len < MESSAGE_LEN_SIZE {
102                    child_stdout.read_exact(&mut stdout_buffer[len..]).await?;
103                }
104
105                let message_len = message_len_from_buffer(&stdout_buffer);
106                let envelope =
107                    read_message_with_len(&mut child_stdout, &mut stdout_buffer, message_len)
108                        .await?;
109                connection_activity_tx.try_send(()).ok();
110                incoming_tx.unbounded_send(envelope).ok();
111            }
112        }
113    });
114
115    let stderr_task: Task<anyhow::Result<()>> = cx.background_spawn(async move {
116        loop {
117            stderr_buffer.resize(stderr_offset + 1024, 0);
118
119            let len = child_stderr
120                .read(&mut stderr_buffer[stderr_offset..])
121                .await?;
122            if len == 0 {
123                return anyhow::Ok(());
124            }
125
126            stderr_offset += len;
127            let mut start_ix = 0;
128            while let Some(ix) = stderr_buffer[start_ix..stderr_offset]
129                .iter()
130                .position(|b| b == &b'\n')
131            {
132                let line_ix = start_ix + ix;
133                let content = &stderr_buffer[start_ix..line_ix];
134                start_ix = line_ix + 1;
135                if let Ok(record) = serde_json::from_slice::<LogRecord>(content) {
136                    record.log(log::logger())
137                } else {
138                    eprintln!("(remote) {}", String::from_utf8_lossy(content));
139                }
140            }
141            stderr_buffer.drain(0..start_ix);
142            stderr_offset -= start_ix;
143
144            connection_activity_tx.try_send(()).ok();
145        }
146    });
147
148    cx.background_spawn(async move {
149        let result = futures::select! {
150            result = stdin_task.fuse() => {
151                result.context("stdin")
152            }
153            result = stdout_task.fuse() => {
154                result.context("stdout")
155            }
156            result = stderr_task.fuse() => {
157                result.context("stderr")
158            }
159        };
160        let status = remote_proxy_process.status().await?.code().unwrap_or(1);
161        match result {
162            Ok(_) => Ok(status),
163            Err(error) => Err(error),
164        }
165    })
166}
167
168#[cfg(debug_assertions)]
169async fn build_remote_server_from_source(
170    platform: &crate::RemotePlatform,
171    delegate: &dyn crate::RemoteClientDelegate,
172    cx: &mut AsyncApp,
173) -> Result<Option<std::path::PathBuf>> {
174    use smol::process::{Command, Stdio};
175    use std::env::VarError;
176    use std::path::Path;
177    use util::command::new_smol_command;
178
179    // By default, we make building remote server from source opt-out and we do not force artifact compression
180    // for quicker builds.
181    let build_remote_server =
182        std::env::var("ZED_BUILD_REMOTE_SERVER").unwrap_or("nocompress".into());
183
184    if let "false" | "no" | "off" | "0" = &*build_remote_server {
185        return Ok(None);
186    }
187
188    async fn run_cmd(command: &mut Command) -> Result<()> {
189        let output = command
190            .kill_on_drop(true)
191            .stderr(Stdio::inherit())
192            .output()
193            .await?;
194        anyhow::ensure!(
195            output.status.success(),
196            "Failed to run command: {command:?}: output: {}",
197            String::from_utf8_lossy(&output.stderr)
198        );
199        Ok(())
200    }
201
202    let use_musl = !build_remote_server.contains("nomusl");
203    let triple = format!(
204        "{}-{}",
205        platform.arch,
206        match platform.os {
207            RemoteOs::Linux =>
208                if use_musl {
209                    "unknown-linux-musl"
210                } else {
211                    "unknown-linux-gnu"
212                },
213            RemoteOs::MacOs => "apple-darwin",
214            RemoteOs::Windows if cfg!(windows) => "pc-windows-msvc",
215            RemoteOs::Windows => "pc-windows-gnu",
216        }
217    );
218    let mut rust_flags = match std::env::var("RUSTFLAGS") {
219        Ok(val) => val,
220        Err(VarError::NotPresent) => String::new(),
221        Err(e) => {
222            log::error!("Failed to get env var `RUSTFLAGS` value: {e}");
223            String::new()
224        }
225    };
226    if platform.os == RemoteOs::Linux && use_musl {
227        rust_flags.push_str(" -C target-feature=+crt-static");
228
229        if let Ok(path) = std::env::var("ZED_ZSTD_MUSL_LIB") {
230            rust_flags.push_str(&format!(" -C link-arg=-L{path}"));
231        }
232    }
233    if build_remote_server.contains("mold") {
234        rust_flags.push_str(" -C link-arg=-fuse-ld=mold");
235    }
236
237    if platform.arch.as_str() == std::env::consts::ARCH
238        && platform.os.as_str() == std::env::consts::OS
239    {
240        delegate.set_status(Some("Building remote server binary from source"), cx);
241        log::info!("building remote server binary from source");
242        run_cmd(
243            new_smol_command("cargo")
244                .current_dir(concat!(env!("CARGO_MANIFEST_DIR"), "/../.."))
245                .args([
246                    "build",
247                    "--package",
248                    "remote_server",
249                    "--features",
250                    "debug-embed",
251                    "--target-dir",
252                    "target/remote_server",
253                    "--target",
254                    &triple,
255                ])
256                .env("RUSTFLAGS", &rust_flags),
257        )
258        .await?;
259    } else {
260        if which("zig", cx).await?.is_none() {
261            anyhow::bail!(if cfg!(not(windows)) {
262                "zig not found on $PATH, install zig (see https://ziglang.org/learn/getting-started or use zigup)"
263            } else {
264                "zig not found on $PATH, install zig (use `winget install -e --id zig.zig` or see https://ziglang.org/learn/getting-started or use zigup)"
265            });
266        }
267
268        let rustup = which("rustup", cx)
269            .await?
270            .context("rustup not found on $PATH, install rustup (see https://rustup.rs/)")?;
271        delegate.set_status(Some("Adding rustup target for cross-compilation"), cx);
272        log::info!("adding rustup target");
273        run_cmd(
274            new_smol_command(rustup)
275                .args(["target", "add"])
276                .arg(&triple),
277        )
278        .await?;
279
280        if which("cargo-zigbuild", cx).await?.is_none() {
281            delegate.set_status(Some("Installing cargo-zigbuild for cross-compilation"), cx);
282            log::info!("installing cargo-zigbuild");
283            run_cmd(new_smol_command("cargo").args(["install", "--locked", "cargo-zigbuild"]))
284                .await?;
285        }
286
287        delegate.set_status(
288            Some(&format!(
289                "Building remote binary from source for {triple} with Zig"
290            )),
291            cx,
292        );
293        log::info!("building remote binary from source for {triple} with Zig");
294        run_cmd(
295            new_smol_command("cargo")
296                .args([
297                    "zigbuild",
298                    "--package",
299                    "remote_server",
300                    "--features",
301                    "debug-embed",
302                    "--target-dir",
303                    "target/remote_server",
304                    "--target",
305                    &triple,
306                ])
307                .env("RUSTFLAGS", &rust_flags),
308        )
309        .await?;
310    };
311    let bin_path = Path::new("target")
312        .join("remote_server")
313        .join(&triple)
314        .join("debug")
315        .join("remote_server")
316        .with_extension(if platform.os.is_windows() { "exe" } else { "" });
317
318    let path = if !build_remote_server.contains("nocompress") {
319        delegate.set_status(Some("Compressing binary"), cx);
320
321        #[cfg(not(target_os = "windows"))]
322        {
323            run_cmd(new_smol_command("gzip").args(["-f", &bin_path.to_string_lossy()])).await?;
324        }
325
326        #[cfg(target_os = "windows")]
327        {
328            // On Windows, we use 7z to compress the binary
329
330            let seven_zip = which("7z.exe",cx)
331                .await?
332                .context("7z.exe not found on $PATH, install it (e.g. with `winget install -e --id 7zip.7zip`) or, if you don't want this behaviour, set $env:ZED_BUILD_REMOTE_SERVER=\"nocompress\"")?;
333            let gz_path = format!("target/remote_server/{}/debug/remote_server.gz", triple);
334            if smol::fs::metadata(&gz_path).await.is_ok() {
335                smol::fs::remove_file(&gz_path).await?;
336            }
337            run_cmd(new_smol_command(seven_zip).args([
338                "a",
339                "-tgzip",
340                &gz_path,
341                &bin_path.to_string_lossy(),
342            ]))
343            .await?;
344        }
345
346        let mut archive_path = bin_path;
347        archive_path.set_extension("gz");
348        std::env::current_dir()?.join(archive_path)
349    } else {
350        bin_path
351    };
352
353    Ok(Some(path))
354}
355
356#[cfg(debug_assertions)]
357async fn which(
358    binary_name: impl AsRef<str>,
359    cx: &mut AsyncApp,
360) -> Result<Option<std::path::PathBuf>> {
361    let binary_name = binary_name.as_ref().to_string();
362    let binary_name_cloned = binary_name.clone();
363    let res = cx
364        .background_spawn(async move { which::which(binary_name_cloned) })
365        .await;
366    match res {
367        Ok(path) => Ok(Some(path)),
368        Err(which::Error::CannotFindBinaryPath) => Ok(None),
369        Err(err) => Err(anyhow::anyhow!(
370            "Failed to run 'which' to find the binary '{binary_name}': {err}"
371        )),
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378
379    #[test]
380    fn test_parse_platform() {
381        let result = parse_platform("Linux x86_64\n").unwrap();
382        assert_eq!(result.os, RemoteOs::Linux);
383        assert_eq!(result.arch, RemoteArch::X86_64);
384
385        let result = parse_platform("Darwin arm64\n").unwrap();
386        assert_eq!(result.os, RemoteOs::MacOs);
387        assert_eq!(result.arch, RemoteArch::Aarch64);
388
389        let result = parse_platform("Linux x86_64").unwrap();
390        assert_eq!(result.os, RemoteOs::Linux);
391        assert_eq!(result.arch, RemoteArch::X86_64);
392
393        let result = parse_platform("some shell init output\nLinux aarch64\n").unwrap();
394        assert_eq!(result.os, RemoteOs::Linux);
395        assert_eq!(result.arch, RemoteArch::Aarch64);
396
397        let result = parse_platform("some shell init output\nLinux aarch64").unwrap();
398        assert_eq!(result.os, RemoteOs::Linux);
399        assert_eq!(result.arch, RemoteArch::Aarch64);
400
401        assert_eq!(
402            parse_platform("Linux armv8l\n").unwrap().arch,
403            RemoteArch::Aarch64
404        );
405        assert_eq!(
406            parse_platform("Linux aarch64\n").unwrap().arch,
407            RemoteArch::Aarch64
408        );
409        assert_eq!(
410            parse_platform("Linux x86_64\n").unwrap().arch,
411            RemoteArch::X86_64
412        );
413
414        let result = parse_platform(
415            r#"Linux x86_64 - What you're referring to as Linux, is in fact, GNU/Linux...\n"#,
416        )
417        .unwrap();
418        assert_eq!(result.os, RemoteOs::Linux);
419        assert_eq!(result.arch, RemoteArch::X86_64);
420
421        assert!(parse_platform("Windows x86_64\n").is_err());
422        assert!(parse_platform("Linux armv7l\n").is_err());
423    }
424
425    #[test]
426    fn test_parse_shell() {
427        assert_eq!(parse_shell("/bin/bash\n", "sh"), "/bin/bash");
428        assert_eq!(parse_shell("/bin/zsh\n", "sh"), "/bin/zsh");
429
430        assert_eq!(parse_shell("/bin/bash", "sh"), "/bin/bash");
431        assert_eq!(
432            parse_shell("some shell init output\n/bin/bash\n", "sh"),
433            "/bin/bash"
434        );
435        assert_eq!(
436            parse_shell("some shell init output\n/bin/bash", "sh"),
437            "/bin/bash"
438        );
439        assert_eq!(parse_shell("", "sh"), "sh");
440        assert_eq!(parse_shell("\n", "sh"), "sh");
441    }
442}