unix.rs

   1use crate::HeadlessProject;
   2use crate::headless_project::HeadlessAppState;
   3use anyhow::{Context as _, Result, anyhow};
   4use client::{ProjectId, ProxySettings};
   5use collections::HashMap;
   6use project::trusted_worktrees;
   7use util::ResultExt;
   8
   9use extension::ExtensionHostProxy;
  10use fs::{Fs, RealFs};
  11use futures::channel::{mpsc, oneshot};
  12use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, select, select_biased};
  13use git::GitHostingProviderRegistry;
  14use gpui::{App, AppContext as _, Context, Entity, UpdateGlobal as _};
  15use gpui_tokio::Tokio;
  16use http_client::{Url, read_proxy_from_env};
  17use language::LanguageRegistry;
  18use node_runtime::{NodeBinaryOptions, NodeRuntime};
  19use paths::logs_dir;
  20use project::project_settings::ProjectSettings;
  21use util::command::new_smol_command;
  22
  23use proto::CrashReport;
  24use release_channel::{AppCommitSha, AppVersion, RELEASE_CHANNEL, ReleaseChannel};
  25use remote::RemoteClient;
  26use remote::{
  27    json_log::LogRecord,
  28    protocol::{read_message, write_message},
  29    proxy::ProxyLaunchError,
  30};
  31use reqwest_client::ReqwestClient;
  32use rpc::proto::{self, Envelope, REMOTE_SERVER_PROJECT_ID};
  33use rpc::{AnyProtoClient, TypedEnvelope};
  34use settings::{Settings, SettingsStore, watch_config_file};
  35
  36use smol::channel::{Receiver, Sender};
  37use smol::io::AsyncReadExt;
  38use smol::{net::unix::UnixListener, stream::StreamExt as _};
  39use std::{
  40    env,
  41    ffi::OsStr,
  42    fs::File,
  43    io::Write,
  44    mem,
  45    ops::ControlFlow,
  46    path::{Path, PathBuf},
  47    process::ExitStatus,
  48    str::FromStr,
  49    sync::{Arc, LazyLock},
  50};
  51use thiserror::Error;
  52
  53pub static VERSION: LazyLock<String> = LazyLock::new(|| match *RELEASE_CHANNEL {
  54    ReleaseChannel::Stable | ReleaseChannel::Preview => env!("ZED_PKG_VERSION").to_owned(),
  55    ReleaseChannel::Nightly | ReleaseChannel::Dev => {
  56        let commit_sha = option_env!("ZED_COMMIT_SHA").unwrap_or("missing-zed-commit-sha");
  57        let build_identifier = option_env!("ZED_BUILD_ID");
  58        if let Some(build_id) = build_identifier {
  59            format!("{build_id}+{commit_sha}")
  60        } else {
  61            commit_sha.to_owned()
  62        }
  63    }
  64});
  65
  66fn init_logging_proxy() {
  67    env_logger::builder()
  68        .format(|buf, record| {
  69            let mut log_record = LogRecord::new(record);
  70            log_record.message =
  71                std::borrow::Cow::Owned(format!("(remote proxy) {}", log_record.message));
  72            serde_json::to_writer(&mut *buf, &log_record)?;
  73            buf.write_all(b"\n")?;
  74            Ok(())
  75        })
  76        .init();
  77}
  78
  79fn init_logging_server(log_file_path: &Path) -> Result<Receiver<Vec<u8>>> {
  80    struct MultiWrite {
  81        file: File,
  82        channel: Sender<Vec<u8>>,
  83        buffer: Vec<u8>,
  84    }
  85
  86    impl Write for MultiWrite {
  87        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
  88            let written = self.file.write(buf)?;
  89            self.buffer.extend_from_slice(&buf[..written]);
  90            Ok(written)
  91        }
  92
  93        fn flush(&mut self) -> std::io::Result<()> {
  94            self.channel
  95                .send_blocking(self.buffer.clone())
  96                .map_err(std::io::Error::other)?;
  97            self.buffer.clear();
  98            self.file.flush()
  99        }
 100    }
 101
 102    let log_file = std::fs::OpenOptions::new()
 103        .create(true)
 104        .append(true)
 105        .open(log_file_path)
 106        .context("Failed to open log file in append mode")?;
 107
 108    let (tx, rx) = smol::channel::unbounded();
 109
 110    let target = Box::new(MultiWrite {
 111        file: log_file,
 112        channel: tx,
 113        buffer: Vec::new(),
 114    });
 115
 116    let old_hook = std::panic::take_hook();
 117    std::panic::set_hook(Box::new(move |info| {
 118        log::error!("Panic occurred: {:?}", info);
 119        old_hook(info);
 120    }));
 121    env_logger::Builder::new()
 122        .filter_level(log::LevelFilter::Info)
 123        .parse_default_env()
 124        .target(env_logger::Target::Pipe(target))
 125        .format(|buf, record| {
 126            let mut log_record = LogRecord::new(record);
 127            log_record.message =
 128                std::borrow::Cow::Owned(format!("(remote server) {}", log_record.message));
 129            serde_json::to_writer(&mut *buf, &log_record)?;
 130            buf.write_all(b"\n")?;
 131            Ok(())
 132        })
 133        .init();
 134
 135    Ok(rx)
 136}
 137
 138fn handle_crash_files_requests(project: &Entity<HeadlessProject>, client: &AnyProtoClient) {
 139    client.add_request_handler(
 140        project.downgrade(),
 141        |_, _: TypedEnvelope<proto::GetCrashFiles>, _cx| async move {
 142            let mut legacy_panics = Vec::new();
 143            let mut crashes = Vec::new();
 144            let mut children = smol::fs::read_dir(paths::logs_dir()).await?;
 145            while let Some(child) = children.next().await {
 146                let child = child?;
 147                let child_path = child.path();
 148
 149                let extension = child_path.extension();
 150                if extension == Some(OsStr::new("panic")) {
 151                    let filename = if let Some(filename) = child_path.file_name() {
 152                        filename.to_string_lossy()
 153                    } else {
 154                        continue;
 155                    };
 156
 157                    if !filename.starts_with("zed") {
 158                        continue;
 159                    }
 160
 161                    let file_contents = smol::fs::read_to_string(&child_path)
 162                        .await
 163                        .context("error reading panic file")?;
 164
 165                    legacy_panics.push(file_contents);
 166                    smol::fs::remove_file(&child_path)
 167                        .await
 168                        .context("error removing panic")
 169                        .log_err();
 170                } else if extension == Some(OsStr::new("dmp")) {
 171                    let mut json_path = child_path.clone();
 172                    json_path.set_extension("json");
 173                    if let Ok(json_content) = smol::fs::read_to_string(&json_path).await {
 174                        crashes.push(CrashReport {
 175                            metadata: json_content,
 176                            minidump_contents: smol::fs::read(&child_path).await?,
 177                        });
 178                        smol::fs::remove_file(&child_path).await.log_err();
 179                        smol::fs::remove_file(&json_path).await.log_err();
 180                    } else {
 181                        log::error!("Couldn't find json metadata for crash: {child_path:?}");
 182                    }
 183                }
 184            }
 185
 186            anyhow::Ok(proto::GetCrashFilesResponse { crashes })
 187        },
 188    );
 189}
 190
 191struct ServerListeners {
 192    stdin: UnixListener,
 193    stdout: UnixListener,
 194    stderr: UnixListener,
 195}
 196
 197impl ServerListeners {
 198    pub fn new(stdin_path: PathBuf, stdout_path: PathBuf, stderr_path: PathBuf) -> Result<Self> {
 199        Ok(Self {
 200            stdin: UnixListener::bind(stdin_path).context("failed to bind stdin socket")?,
 201            stdout: UnixListener::bind(stdout_path).context("failed to bind stdout socket")?,
 202            stderr: UnixListener::bind(stderr_path).context("failed to bind stderr socket")?,
 203        })
 204    }
 205}
 206
 207fn start_server(
 208    listeners: ServerListeners,
 209    log_rx: Receiver<Vec<u8>>,
 210    cx: &mut App,
 211    is_wsl_interop: bool,
 212) -> AnyProtoClient {
 213    // This is the server idle timeout. If no connection comes in this timeout, the server will shut down.
 214    const IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10 * 60);
 215
 216    let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 217    let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded::<Envelope>();
 218    let (app_quit_tx, mut app_quit_rx) = mpsc::unbounded::<()>();
 219
 220    cx.on_app_quit(move |_| {
 221        let mut app_quit_tx = app_quit_tx.clone();
 222        async move {
 223            log::info!("app quitting. sending signal to server main loop");
 224            app_quit_tx.send(()).await.ok();
 225        }
 226    })
 227    .detach();
 228
 229    cx.spawn(async move |cx| {
 230        let mut stdin_incoming = listeners.stdin.incoming();
 231        let mut stdout_incoming = listeners.stdout.incoming();
 232        let mut stderr_incoming = listeners.stderr.incoming();
 233
 234        loop {
 235            let streams = futures::future::join3(stdin_incoming.next(), stdout_incoming.next(), stderr_incoming.next());
 236
 237            log::info!("accepting new connections");
 238            let result = select! {
 239                streams = streams.fuse() => {
 240                    let (Some(Ok(stdin_stream)), Some(Ok(stdout_stream)), Some(Ok(stderr_stream))) = streams else {
 241                        break;
 242                    };
 243                    anyhow::Ok((stdin_stream, stdout_stream, stderr_stream))
 244                }
 245                _ = futures::FutureExt::fuse(cx.background_executor().timer(IDLE_TIMEOUT)) => {
 246                    log::warn!("timed out waiting for new connections after {:?}. exiting.", IDLE_TIMEOUT);
 247                    cx.update(|cx| {
 248                        // TODO: This is a hack, because in a headless project, shutdown isn't executed
 249                        // when calling quit, but it should be.
 250                        cx.shutdown();
 251                        cx.quit();
 252                    });
 253                    break;
 254                }
 255                _ = app_quit_rx.next().fuse() => {
 256                    break;
 257                }
 258            };
 259
 260            let Ok((mut stdin_stream, mut stdout_stream, mut stderr_stream)) = result else {
 261                break;
 262            };
 263
 264            let mut input_buffer = Vec::new();
 265            let mut output_buffer = Vec::new();
 266
 267            let (mut stdin_msg_tx, mut stdin_msg_rx) = mpsc::unbounded::<Envelope>();
 268            cx.background_spawn(async move {
 269                while let Ok(msg) = read_message(&mut stdin_stream, &mut input_buffer).await {
 270                    if (stdin_msg_tx.send(msg).await).is_err() {
 271                        break;
 272                    }
 273                }
 274            }).detach();
 275
 276            loop {
 277
 278                select_biased! {
 279                    _ = app_quit_rx.next().fuse() => {
 280                        return anyhow::Ok(());
 281                    }
 282
 283                    stdin_message = stdin_msg_rx.next().fuse() => {
 284                        let Some(message) = stdin_message else {
 285                            log::warn!("error reading message on stdin. exiting.");
 286                            break;
 287                        };
 288                        if let Err(error) = incoming_tx.unbounded_send(message) {
 289                            log::error!("failed to send message to application: {error:?}. exiting.");
 290                            return Err(anyhow!(error));
 291                        }
 292                    }
 293
 294                    outgoing_message  = outgoing_rx.next().fuse() => {
 295                        let Some(message) = outgoing_message else {
 296                            log::error!("stdout handler, no message");
 297                            break;
 298                        };
 299
 300                        if let Err(error) =
 301                            write_message(&mut stdout_stream, &mut output_buffer, message).await
 302                        {
 303                            log::error!("failed to write stdout message: {:?}", error);
 304                            break;
 305                        }
 306                        if let Err(error) = stdout_stream.flush().await {
 307                            log::error!("failed to flush stdout message: {:?}", error);
 308                            break;
 309                        }
 310                    }
 311
 312                    log_message = log_rx.recv().fuse() => {
 313                        if let Ok(log_message) = log_message {
 314                            if let Err(error) = stderr_stream.write_all(&log_message).await {
 315                                log::error!("failed to write log message to stderr: {:?}", error);
 316                                break;
 317                            }
 318                            if let Err(error) = stderr_stream.flush().await {
 319                                log::error!("failed to flush stderr stream: {:?}", error);
 320                                break;
 321                            }
 322                        }
 323                    }
 324                }
 325            }
 326        }
 327        anyhow::Ok(())
 328    })
 329    .detach();
 330
 331    RemoteClient::proto_client_from_channels(incoming_rx, outgoing_tx, cx, "server", is_wsl_interop)
 332}
 333
 334fn init_paths() -> anyhow::Result<()> {
 335    for path in [
 336        paths::config_dir(),
 337        paths::extensions_dir(),
 338        paths::languages_dir(),
 339        paths::logs_dir(),
 340        paths::temp_dir(),
 341        paths::hang_traces_dir(),
 342        paths::remote_extensions_dir(),
 343        paths::remote_extensions_uploads_dir(),
 344    ]
 345    .iter()
 346    {
 347        std::fs::create_dir_all(path).with_context(|| format!("creating directory {path:?}"))?;
 348    }
 349    Ok(())
 350}
 351
 352pub fn execute_run(
 353    log_file: PathBuf,
 354    pid_file: PathBuf,
 355    stdin_socket: PathBuf,
 356    stdout_socket: PathBuf,
 357    stderr_socket: PathBuf,
 358) -> Result<()> {
 359    init_paths()?;
 360
 361    match daemonize()? {
 362        ControlFlow::Break(_) => return Ok(()),
 363        ControlFlow::Continue(_) => {}
 364    }
 365
 366    let app = gpui::Application::headless();
 367    let id = std::process::id().to_string();
 368    app.background_executor()
 369        .spawn(crashes::init(crashes::InitCrashHandler {
 370            session_id: id,
 371            zed_version: VERSION.to_owned(),
 372            binary: "zed-remote-server".to_string(),
 373            release_channel: release_channel::RELEASE_CHANNEL_NAME.clone(),
 374            commit_sha: option_env!("ZED_COMMIT_SHA").unwrap_or("no_sha").to_owned(),
 375        }))
 376        .detach();
 377    let log_rx = init_logging_server(&log_file)?;
 378    log::info!(
 379        "starting up. pid_file: {:?}, log_file: {:?}, stdin_socket: {:?}, stdout_socket: {:?}, stderr_socket: {:?}",
 380        pid_file,
 381        log_file,
 382        stdin_socket,
 383        stdout_socket,
 384        stderr_socket
 385    );
 386
 387    write_pid_file(&pid_file)
 388        .with_context(|| format!("failed to write pid file: {:?}", &pid_file))?;
 389
 390    let listeners = ServerListeners::new(stdin_socket, stdout_socket, stderr_socket)?;
 391
 392    rayon::ThreadPoolBuilder::new()
 393        .num_threads(std::thread::available_parallelism().map_or(1, |n| n.get().div_ceil(2)))
 394        .stack_size(10 * 1024 * 1024)
 395        .thread_name(|ix| format!("RayonWorker{}", ix))
 396        .build_global()
 397        .unwrap();
 398
 399    let (shell_env_loaded_tx, shell_env_loaded_rx) = oneshot::channel();
 400    app.background_executor()
 401        .spawn(async {
 402            util::load_login_shell_environment().await.log_err();
 403            shell_env_loaded_tx.send(()).ok();
 404        })
 405        .detach();
 406
 407    let git_hosting_provider_registry = Arc::new(GitHostingProviderRegistry::new());
 408    app.run(move |cx| {
 409        settings::init(cx);
 410        let app_commit_sha = option_env!("ZED_COMMIT_SHA").map(|s| AppCommitSha::new(s.to_owned()));
 411        let app_version = AppVersion::load(
 412            env!("ZED_PKG_VERSION"),
 413            option_env!("ZED_BUILD_ID"),
 414            app_commit_sha,
 415        );
 416        release_channel::init(app_version, cx);
 417        gpui_tokio::init(cx);
 418
 419        HeadlessProject::init(cx);
 420
 421        let is_wsl_interop = if cfg!(target_os = "linux") {
 422            // See: https://learn.microsoft.com/en-us/windows/wsl/filesystems#disable-interoperability
 423            matches!(std::fs::read_to_string("/proc/sys/fs/binfmt_misc/WSLInterop"), Ok(s) if s.contains("enabled"))
 424        } else {
 425            false
 426        };
 427
 428        log::info!("gpui app started, initializing server");
 429        let session = start_server(listeners, log_rx, cx, is_wsl_interop);
 430        trusted_worktrees::init(HashMap::default(), Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))), None, cx);
 431
 432        GitHostingProviderRegistry::set_global(git_hosting_provider_registry, cx);
 433        git_hosting_providers::init(cx);
 434        dap_adapters::init(cx);
 435
 436        extension::init(cx);
 437        let extension_host_proxy = ExtensionHostProxy::global(cx);
 438
 439        json_schema_store::init(cx);
 440
 441        let project = cx.new(|cx| {
 442            let fs = Arc::new(RealFs::new(None, cx.background_executor().clone()));
 443            let node_settings_rx = initialize_settings(session.clone(), fs.clone(), cx);
 444
 445            let proxy_url = read_proxy_settings(cx);
 446
 447            let http_client = {
 448                let _guard = Tokio::handle(cx).enter();
 449                Arc::new(
 450                    ReqwestClient::proxy_and_user_agent(
 451                        proxy_url,
 452                        &format!(
 453                            "Zed-Server/{} ({}; {})",
 454                            env!("CARGO_PKG_VERSION"),
 455                            std::env::consts::OS,
 456                            std::env::consts::ARCH
 457                        ),
 458                    )
 459                    .expect("Could not start HTTP client"),
 460                )
 461            };
 462
 463            let node_runtime = NodeRuntime::new(
 464                http_client.clone(),
 465                Some(shell_env_loaded_rx),
 466                node_settings_rx,
 467            );
 468
 469            let mut languages = LanguageRegistry::new(cx.background_executor().clone());
 470            languages.set_language_server_download_dir(paths::languages_dir().clone());
 471            let languages = Arc::new(languages);
 472
 473            HeadlessProject::new(
 474                HeadlessAppState {
 475                    session: session.clone(),
 476                    fs,
 477                    http_client,
 478                    node_runtime,
 479                    languages,
 480                    extension_host_proxy,
 481                },
 482                true,
 483                cx,
 484            )
 485        });
 486
 487        handle_crash_files_requests(&project, &session);
 488
 489        cx.background_spawn(async move { cleanup_old_binaries() })
 490            .detach();
 491
 492        mem::forget(project);
 493    });
 494    log::info!("gpui app is shut down. quitting.");
 495    Ok(())
 496}
 497
 498#[derive(Debug, Error)]
 499pub(crate) enum ServerPathError {
 500    #[error("Failed to create server_dir `{path}`")]
 501    CreateServerDir {
 502        #[source]
 503        source: std::io::Error,
 504        path: PathBuf,
 505    },
 506    #[error("Failed to create logs_dir `{path}`")]
 507    CreateLogsDir {
 508        #[source]
 509        source: std::io::Error,
 510        path: PathBuf,
 511    },
 512}
 513
 514#[derive(Clone, Debug)]
 515struct ServerPaths {
 516    log_file: PathBuf,
 517    pid_file: PathBuf,
 518    stdin_socket: PathBuf,
 519    stdout_socket: PathBuf,
 520    stderr_socket: PathBuf,
 521}
 522
 523impl ServerPaths {
 524    fn new(identifier: &str) -> Result<Self, ServerPathError> {
 525        let server_dir = paths::remote_server_state_dir().join(identifier);
 526        std::fs::create_dir_all(&server_dir).map_err(|source| {
 527            ServerPathError::CreateServerDir {
 528                source,
 529                path: server_dir.clone(),
 530            }
 531        })?;
 532        let log_dir = logs_dir();
 533        std::fs::create_dir_all(log_dir).map_err(|source| ServerPathError::CreateLogsDir {
 534            source: source,
 535            path: log_dir.clone(),
 536        })?;
 537
 538        let pid_file = server_dir.join("server.pid");
 539        let stdin_socket = server_dir.join("stdin.sock");
 540        let stdout_socket = server_dir.join("stdout.sock");
 541        let stderr_socket = server_dir.join("stderr.sock");
 542        let log_file = logs_dir().join(format!("server-{}.log", identifier));
 543
 544        Ok(Self {
 545            pid_file,
 546            stdin_socket,
 547            stdout_socket,
 548            stderr_socket,
 549            log_file,
 550        })
 551    }
 552}
 553
 554#[derive(Debug, Error)]
 555pub(crate) enum ExecuteProxyError {
 556    #[error("Failed to init server paths")]
 557    ServerPath(#[from] ServerPathError),
 558
 559    #[error(transparent)]
 560    ServerNotRunning(#[from] ProxyLaunchError),
 561
 562    #[error("Failed to check PidFile '{path}'")]
 563    CheckPidFile {
 564        #[source]
 565        source: CheckPidError,
 566        path: PathBuf,
 567    },
 568
 569    #[error("Failed to kill existing server with pid '{pid}'")]
 570    KillRunningServer {
 571        #[source]
 572        source: std::io::Error,
 573        pid: u32,
 574    },
 575
 576    #[error("failed to spawn server")]
 577    SpawnServer(#[source] SpawnServerError),
 578
 579    #[error("stdin_task failed")]
 580    StdinTask(#[source] anyhow::Error),
 581    #[error("stdout_task failed")]
 582    StdoutTask(#[source] anyhow::Error),
 583    #[error("stderr_task failed")]
 584    StderrTask(#[source] anyhow::Error),
 585}
 586
 587pub(crate) fn execute_proxy(
 588    identifier: String,
 589    is_reconnecting: bool,
 590) -> Result<(), ExecuteProxyError> {
 591    init_logging_proxy();
 592
 593    let server_paths = ServerPaths::new(&identifier)?;
 594
 595    let id = std::process::id().to_string();
 596    smol::spawn(crashes::init(crashes::InitCrashHandler {
 597        session_id: id,
 598        zed_version: VERSION.to_owned(),
 599        binary: "zed-remote-server".to_string(),
 600        release_channel: release_channel::RELEASE_CHANNEL_NAME.clone(),
 601        commit_sha: option_env!("ZED_COMMIT_SHA").unwrap_or("no_sha").to_owned(),
 602    }))
 603    .detach();
 604
 605    log::info!("starting proxy process. PID: {}", std::process::id());
 606    smol::block_on(async {
 607        let server_pid = check_pid_file(&server_paths.pid_file)
 608            .await
 609            .map_err(|source| ExecuteProxyError::CheckPidFile {
 610                source,
 611                path: server_paths.pid_file.clone(),
 612            })?;
 613        let server_running = server_pid.is_some();
 614        if is_reconnecting {
 615            if !server_running {
 616                log::error!("attempted to reconnect, but no server running");
 617                return Err(ExecuteProxyError::ServerNotRunning(
 618                    ProxyLaunchError::ServerNotRunning,
 619                ));
 620            }
 621        } else {
 622            if let Some(pid) = server_pid {
 623                log::info!(
 624                    "proxy found server already running with PID {}. Killing process and cleaning up files...",
 625                    pid
 626                );
 627                kill_running_server(pid, &server_paths).await?;
 628            }
 629
 630            spawn_server(&server_paths)
 631                .await
 632                .map_err(ExecuteProxyError::SpawnServer)?;
 633        };
 634        Ok(())
 635    })?;
 636
 637    let stdin_task = smol::spawn(async move {
 638        let stdin = smol::Unblock::new(std::io::stdin());
 639        let stream = smol::net::unix::UnixStream::connect(&server_paths.stdin_socket).await?;
 640        handle_io(stdin, stream, "stdin").await
 641    });
 642
 643    let stdout_task: smol::Task<Result<()>> = smol::spawn(async move {
 644        let stdout = smol::Unblock::new(std::io::stdout());
 645        let stream = smol::net::unix::UnixStream::connect(&server_paths.stdout_socket).await?;
 646        handle_io(stream, stdout, "stdout").await
 647    });
 648
 649    let stderr_task: smol::Task<Result<()>> = smol::spawn(async move {
 650        let mut stderr = smol::Unblock::new(std::io::stderr());
 651        let mut stream = smol::net::unix::UnixStream::connect(&server_paths.stderr_socket).await?;
 652        let mut stderr_buffer = vec![0; 2048];
 653        loop {
 654            match stream
 655                .read(&mut stderr_buffer)
 656                .await
 657                .context("reading stderr")?
 658            {
 659                0 => {
 660                    let error =
 661                        std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "stderr closed");
 662                    Err(anyhow!(error))?;
 663                }
 664                n => {
 665                    stderr.write_all(&stderr_buffer[..n]).await?;
 666                    stderr.flush().await?;
 667                }
 668            }
 669        }
 670    });
 671
 672    if let Err(forwarding_result) = smol::block_on(async move {
 673        futures::select! {
 674            result = stdin_task.fuse() => result.map_err(ExecuteProxyError::StdinTask),
 675            result = stdout_task.fuse() => result.map_err(ExecuteProxyError::StdoutTask),
 676            result = stderr_task.fuse() => result.map_err(ExecuteProxyError::StderrTask),
 677        }
 678    }) {
 679        log::error!(
 680            "encountered error while forwarding messages: {:?}, terminating...",
 681            forwarding_result
 682        );
 683        return Err(forwarding_result);
 684    }
 685
 686    Ok(())
 687}
 688
 689async fn kill_running_server(pid: u32, paths: &ServerPaths) -> Result<(), ExecuteProxyError> {
 690    log::info!("killing existing server with PID {}", pid);
 691    new_smol_command("kill")
 692        .arg(pid.to_string())
 693        .output()
 694        .await
 695        .map_err(|source| ExecuteProxyError::KillRunningServer { source, pid })?;
 696
 697    for file in [
 698        &paths.pid_file,
 699        &paths.stdin_socket,
 700        &paths.stdout_socket,
 701        &paths.stderr_socket,
 702    ] {
 703        log::debug!("cleaning up file {:?} before starting new server", file);
 704        std::fs::remove_file(file).ok();
 705    }
 706    Ok(())
 707}
 708
 709#[derive(Debug, Error)]
 710pub(crate) enum SpawnServerError {
 711    #[error("failed to remove stdin socket")]
 712    RemoveStdinSocket(#[source] std::io::Error),
 713
 714    #[error("failed to remove stdout socket")]
 715    RemoveStdoutSocket(#[source] std::io::Error),
 716
 717    #[error("failed to remove stderr socket")]
 718    RemoveStderrSocket(#[source] std::io::Error),
 719
 720    #[error("failed to get current_exe")]
 721    CurrentExe(#[source] std::io::Error),
 722
 723    #[error("failed to launch server process")]
 724    ProcessStatus(#[source] std::io::Error),
 725
 726    #[error("failed to launch and detach server process: {status}\n{paths}")]
 727    LaunchStatus { status: ExitStatus, paths: String },
 728
 729    #[error("failed to wait for server to be ready to accept connections")]
 730    Timeout,
 731}
 732
 733async fn spawn_server(paths: &ServerPaths) -> Result<(), SpawnServerError> {
 734    log::info!("spawning server process",);
 735    if paths.stdin_socket.exists() {
 736        std::fs::remove_file(&paths.stdin_socket).map_err(SpawnServerError::RemoveStdinSocket)?;
 737    }
 738    if paths.stdout_socket.exists() {
 739        std::fs::remove_file(&paths.stdout_socket).map_err(SpawnServerError::RemoveStdoutSocket)?;
 740    }
 741    if paths.stderr_socket.exists() {
 742        std::fs::remove_file(&paths.stderr_socket).map_err(SpawnServerError::RemoveStderrSocket)?;
 743    }
 744
 745    let binary_name = std::env::current_exe().map_err(SpawnServerError::CurrentExe)?;
 746    let mut server_process = new_smol_command(binary_name);
 747    server_process
 748        .arg("run")
 749        .arg("--log-file")
 750        .arg(&paths.log_file)
 751        .arg("--pid-file")
 752        .arg(&paths.pid_file)
 753        .arg("--stdin-socket")
 754        .arg(&paths.stdin_socket)
 755        .arg("--stdout-socket")
 756        .arg(&paths.stdout_socket)
 757        .arg("--stderr-socket")
 758        .arg(&paths.stderr_socket);
 759
 760    let status = server_process
 761        .status()
 762        .await
 763        .map_err(SpawnServerError::ProcessStatus)?;
 764
 765    if !status.success() {
 766        return Err(SpawnServerError::LaunchStatus {
 767            status,
 768            paths: format!(
 769                "log file: {:?}, pid file: {:?}",
 770                paths.log_file, paths.pid_file,
 771            ),
 772        });
 773    }
 774
 775    let mut total_time_waited = std::time::Duration::from_secs(0);
 776    let wait_duration = std::time::Duration::from_millis(20);
 777    while !paths.stdout_socket.exists()
 778        || !paths.stdin_socket.exists()
 779        || !paths.stderr_socket.exists()
 780    {
 781        log::debug!("waiting for server to be ready to accept connections...");
 782        std::thread::sleep(wait_duration);
 783        total_time_waited += wait_duration;
 784        if total_time_waited > std::time::Duration::from_secs(10) {
 785            return Err(SpawnServerError::Timeout);
 786        }
 787    }
 788
 789    log::info!(
 790        "server ready to accept connections. total time waited: {:?}",
 791        total_time_waited
 792    );
 793
 794    Ok(())
 795}
 796
 797#[derive(Debug, Error)]
 798#[error("Failed to remove PID file for missing process (pid `{pid}`")]
 799pub(crate) struct CheckPidError {
 800    #[source]
 801    source: std::io::Error,
 802    pid: u32,
 803}
 804
 805async fn check_pid_file(path: &Path) -> Result<Option<u32>, CheckPidError> {
 806    let Some(pid) = std::fs::read_to_string(&path)
 807        .ok()
 808        .and_then(|contents| contents.parse::<u32>().ok())
 809    else {
 810        return Ok(None);
 811    };
 812
 813    log::debug!("Checking if process with PID {} exists...", pid);
 814    match new_smol_command("kill")
 815        .arg("-0")
 816        .arg(pid.to_string())
 817        .output()
 818        .await
 819    {
 820        Ok(output) if output.status.success() => {
 821            log::debug!(
 822                "Process with PID {} exists. NOT spawning new server, but attaching to existing one.",
 823                pid
 824            );
 825            Ok(Some(pid))
 826        }
 827        _ => {
 828            log::debug!(
 829                "Found PID file, but process with that PID does not exist. Removing PID file."
 830            );
 831            std::fs::remove_file(&path).map_err(|source| CheckPidError { source, pid })?;
 832            Ok(None)
 833        }
 834    }
 835}
 836
 837fn write_pid_file(path: &Path) -> Result<()> {
 838    if path.exists() {
 839        std::fs::remove_file(path)?;
 840    }
 841    let pid = std::process::id().to_string();
 842    log::debug!("writing PID {} to file {:?}", pid, path);
 843    std::fs::write(path, pid).context("Failed to write PID file")
 844}
 845
 846async fn handle_io<R, W>(mut reader: R, mut writer: W, socket_name: &str) -> Result<()>
 847where
 848    R: AsyncRead + Unpin,
 849    W: AsyncWrite + Unpin,
 850{
 851    use remote::protocol::{read_message_raw, write_size_prefixed_buffer};
 852
 853    let mut buffer = Vec::new();
 854    loop {
 855        read_message_raw(&mut reader, &mut buffer)
 856            .await
 857            .with_context(|| format!("failed to read message from {}", socket_name))?;
 858        write_size_prefixed_buffer(&mut writer, &mut buffer)
 859            .await
 860            .with_context(|| format!("failed to write message to {}", socket_name))?;
 861        writer.flush().await?;
 862        buffer.clear();
 863    }
 864}
 865
 866fn initialize_settings(
 867    session: AnyProtoClient,
 868    fs: Arc<dyn Fs>,
 869    cx: &mut App,
 870) -> watch::Receiver<Option<NodeBinaryOptions>> {
 871    let user_settings_file_rx =
 872        watch_config_file(cx.background_executor(), fs, paths::settings_file().clone());
 873
 874    handle_settings_file_changes(user_settings_file_rx, cx, {
 875        move |err, _cx| {
 876            if let Some(e) = err {
 877                log::info!("Server settings failed to change: {}", e);
 878
 879                session
 880                    .send(proto::Toast {
 881                        project_id: REMOTE_SERVER_PROJECT_ID,
 882                        notification_id: "server-settings-failed".to_string(),
 883                        message: format!(
 884                            "Error in settings on remote host {:?}: {}",
 885                            paths::settings_file(),
 886                            e
 887                        ),
 888                    })
 889                    .log_err();
 890            } else {
 891                session
 892                    .send(proto::HideToast {
 893                        project_id: REMOTE_SERVER_PROJECT_ID,
 894                        notification_id: "server-settings-failed".to_string(),
 895                    })
 896                    .log_err();
 897            }
 898        }
 899    });
 900
 901    let (mut tx, rx) = watch::channel(None);
 902    let mut node_settings = None;
 903    cx.observe_global::<SettingsStore>(move |cx| {
 904        let new_node_settings = &ProjectSettings::get_global(cx).node;
 905        if Some(new_node_settings) != node_settings.as_ref() {
 906            log::info!("Got new node settings: {new_node_settings:?}");
 907            let options = NodeBinaryOptions {
 908                allow_path_lookup: !new_node_settings.ignore_system_version,
 909                // TODO: Implement this setting
 910                allow_binary_download: true,
 911                use_paths: new_node_settings.path.as_ref().map(|node_path| {
 912                    let node_path = PathBuf::from(shellexpand::tilde(node_path).as_ref());
 913                    let npm_path = new_node_settings
 914                        .npm_path
 915                        .as_ref()
 916                        .map(|path| PathBuf::from(shellexpand::tilde(&path).as_ref()));
 917                    (
 918                        node_path.clone(),
 919                        npm_path.unwrap_or_else(|| {
 920                            let base_path = PathBuf::new();
 921                            node_path.parent().unwrap_or(&base_path).join("npm")
 922                        }),
 923                    )
 924                }),
 925            };
 926            node_settings = Some(new_node_settings.clone());
 927            tx.send(Some(options)).ok();
 928        }
 929    })
 930    .detach();
 931
 932    rx
 933}
 934
 935pub fn handle_settings_file_changes(
 936    mut server_settings_file: mpsc::UnboundedReceiver<String>,
 937    cx: &mut App,
 938    settings_changed: impl Fn(Option<anyhow::Error>, &mut App) + 'static,
 939) {
 940    let server_settings_content = cx
 941        .foreground_executor()
 942        .block_on(server_settings_file.next())
 943        .unwrap();
 944    SettingsStore::update_global(cx, |store, cx| {
 945        store
 946            .set_server_settings(&server_settings_content, cx)
 947            .log_err();
 948    });
 949    cx.spawn(async move |cx| {
 950        while let Some(server_settings_content) = server_settings_file.next().await {
 951            cx.update_global(|store: &mut SettingsStore, cx| {
 952                let result = store.set_server_settings(&server_settings_content, cx);
 953                if let Err(err) = &result {
 954                    log::error!("Failed to load server settings: {err}");
 955                }
 956                settings_changed(result.err(), cx);
 957                cx.refresh_windows();
 958            });
 959        }
 960    })
 961    .detach();
 962}
 963
 964fn read_proxy_settings(cx: &mut Context<HeadlessProject>) -> Option<Url> {
 965    let proxy_str = ProxySettings::get_global(cx).proxy.to_owned();
 966
 967    proxy_str
 968        .as_deref()
 969        .map(str::trim)
 970        .filter(|input| !input.is_empty())
 971        .and_then(|input| {
 972            input
 973                .parse::<Url>()
 974                .inspect_err(|e| log::error!("Error parsing proxy settings: {}", e))
 975                .ok()
 976        })
 977        .or_else(read_proxy_from_env)
 978}
 979
 980fn daemonize() -> Result<ControlFlow<()>> {
 981    match fork::fork().map_err(|e| anyhow!("failed to call fork with error code {e}"))? {
 982        fork::Fork::Parent(_) => {
 983            return Ok(ControlFlow::Break(()));
 984        }
 985        fork::Fork::Child => {}
 986    }
 987
 988    // Once we've detached from the parent, we want to close stdout/stderr/stdin
 989    // so that the outer SSH process is not attached to us in any way anymore.
 990    unsafe { redirect_standard_streams() }?;
 991
 992    Ok(ControlFlow::Continue(()))
 993}
 994
 995unsafe fn redirect_standard_streams() -> Result<()> {
 996    let devnull_fd = unsafe { libc::open(b"/dev/null\0" as *const [u8; 10] as _, libc::O_RDWR) };
 997    anyhow::ensure!(devnull_fd != -1, "failed to open /dev/null");
 998
 999    let process_stdio = |name, fd| {
1000        let reopened_fd = unsafe { libc::dup2(devnull_fd, fd) };
1001        anyhow::ensure!(
1002            reopened_fd != -1,
1003            format!("failed to redirect {} to /dev/null", name)
1004        );
1005        Ok(())
1006    };
1007
1008    process_stdio("stdin", libc::STDIN_FILENO)?;
1009    process_stdio("stdout", libc::STDOUT_FILENO)?;
1010    process_stdio("stderr", libc::STDERR_FILENO)?;
1011
1012    anyhow::ensure!(
1013        unsafe { libc::close(devnull_fd) != -1 },
1014        "failed to close /dev/null fd after redirecting"
1015    );
1016
1017    Ok(())
1018}
1019
1020fn cleanup_old_binaries() -> Result<()> {
1021    let server_dir = paths::remote_server_dir_relative();
1022    let release_channel = release_channel::RELEASE_CHANNEL.dev_name();
1023    let prefix = format!("zed-remote-server-{}-", release_channel);
1024
1025    for entry in std::fs::read_dir(server_dir.as_std_path())? {
1026        let path = entry?.path();
1027
1028        if let Some(file_name) = path.file_name()
1029            && let Some(version) = file_name.to_string_lossy().strip_prefix(&prefix)
1030            && !is_new_version(version)
1031            && !is_file_in_use(file_name)
1032        {
1033            log::info!("removing old remote server binary: {:?}", path);
1034            std::fs::remove_file(&path)?;
1035        }
1036    }
1037
1038    Ok(())
1039}
1040
1041fn is_new_version(version: &str) -> bool {
1042    semver::Version::from_str(version)
1043        .ok()
1044        .zip(semver::Version::from_str(env!("ZED_PKG_VERSION")).ok())
1045        .is_some_and(|(version, current_version)| version >= current_version)
1046}
1047
1048fn is_file_in_use(file_name: &OsStr) -> bool {
1049    let info = sysinfo::System::new_with_specifics(sysinfo::RefreshKind::nothing().with_processes(
1050        sysinfo::ProcessRefreshKind::nothing().with_exe(sysinfo::UpdateKind::Always),
1051    ));
1052
1053    for process in info.processes().values() {
1054        if process
1055            .exe()
1056            .is_some_and(|exe| exe.file_name().is_some_and(|name| name == file_name))
1057        {
1058            return true;
1059        }
1060    }
1061
1062    false
1063}