unix.rs

   1use crate::HeadlessProject;
   2use crate::headless_project::HeadlessAppState;
   3use anyhow::{Context as _, Result, anyhow};
   4use client::ProxySettings;
   5use util::ResultExt;
   6
   7use extension::ExtensionHostProxy;
   8use fs::{Fs, RealFs};
   9use futures::channel::{mpsc, oneshot};
  10use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, select, select_biased};
  11use git::GitHostingProviderRegistry;
  12use gpui::{App, AppContext as _, Context, Entity, UpdateGlobal as _};
  13use gpui_tokio::Tokio;
  14use http_client::{Url, read_proxy_from_env};
  15use language::LanguageRegistry;
  16use node_runtime::{NodeBinaryOptions, NodeRuntime};
  17use paths::logs_dir;
  18use project::project_settings::ProjectSettings;
  19use util::command::new_smol_command;
  20
  21use proto::CrashReport;
  22use release_channel::{AppCommitSha, AppVersion, RELEASE_CHANNEL, ReleaseChannel};
  23use remote::RemoteClient;
  24use remote::{
  25    json_log::LogRecord,
  26    protocol::{read_message, write_message},
  27    proxy::ProxyLaunchError,
  28};
  29use reqwest_client::ReqwestClient;
  30use rpc::proto::{self, Envelope, REMOTE_SERVER_PROJECT_ID};
  31use rpc::{AnyProtoClient, TypedEnvelope};
  32use settings::{Settings, SettingsStore, watch_config_file};
  33use smol::Async;
  34use smol::channel::{Receiver, Sender};
  35use smol::io::AsyncReadExt;
  36use smol::{net::unix::UnixListener, stream::StreamExt as _};
  37use std::{
  38    env,
  39    ffi::OsStr,
  40    fs::File,
  41    io::Write,
  42    mem,
  43    ops::ControlFlow,
  44    path::{Path, PathBuf},
  45    process::ExitStatus,
  46    str::FromStr,
  47    sync::{Arc, LazyLock},
  48};
  49use thiserror::Error;
  50
  51pub static VERSION: LazyLock<String> = LazyLock::new(|| match *RELEASE_CHANNEL {
  52    ReleaseChannel::Stable | ReleaseChannel::Preview => env!("ZED_PKG_VERSION").to_owned(),
  53    ReleaseChannel::Nightly | ReleaseChannel::Dev => {
  54        let commit_sha = option_env!("ZED_COMMIT_SHA").unwrap_or("missing-zed-commit-sha");
  55        let build_identifier = option_env!("ZED_BUILD_ID");
  56        if let Some(build_id) = build_identifier {
  57            format!("{build_id}+{commit_sha}")
  58        } else {
  59            commit_sha.to_owned()
  60        }
  61    }
  62});
  63
  64fn init_logging_proxy() {
  65    env_logger::builder()
  66        .format(|buf, record| {
  67            let mut log_record = LogRecord::new(record);
  68            log_record.message = format!("(remote proxy) {}", log_record.message);
  69            serde_json::to_writer(&mut *buf, &log_record)?;
  70            buf.write_all(b"\n")?;
  71            Ok(())
  72        })
  73        .init();
  74}
  75
  76fn init_logging_server(log_file_path: PathBuf) -> Result<Receiver<Vec<u8>>> {
  77    struct MultiWrite {
  78        file: File,
  79        channel: Sender<Vec<u8>>,
  80        buffer: Vec<u8>,
  81    }
  82
  83    impl Write for MultiWrite {
  84        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
  85            let written = self.file.write(buf)?;
  86            self.buffer.extend_from_slice(&buf[..written]);
  87            Ok(written)
  88        }
  89
  90        fn flush(&mut self) -> std::io::Result<()> {
  91            self.channel
  92                .send_blocking(self.buffer.clone())
  93                .map_err(std::io::Error::other)?;
  94            self.buffer.clear();
  95            self.file.flush()
  96        }
  97    }
  98
  99    let log_file = std::fs::OpenOptions::new()
 100        .create(true)
 101        .append(true)
 102        .open(&log_file_path)
 103        .context("Failed to open log file in append mode")?;
 104
 105    let (tx, rx) = smol::channel::unbounded();
 106
 107    let target = Box::new(MultiWrite {
 108        file: log_file,
 109        channel: tx,
 110        buffer: Vec::new(),
 111    });
 112
 113    env_logger::Builder::new()
 114        .filter_level(log::LevelFilter::Info)
 115        .parse_default_env()
 116        .target(env_logger::Target::Pipe(target))
 117        .format(|buf, record| {
 118            let mut log_record = LogRecord::new(record);
 119            log_record.message = format!("(remote server) {}", log_record.message);
 120            serde_json::to_writer(&mut *buf, &log_record)?;
 121            buf.write_all(b"\n")?;
 122            Ok(())
 123        })
 124        .init();
 125
 126    Ok(rx)
 127}
 128
 129fn handle_crash_files_requests(project: &Entity<HeadlessProject>, client: &AnyProtoClient) {
 130    client.add_request_handler(
 131        project.downgrade(),
 132        |_, _: TypedEnvelope<proto::GetCrashFiles>, _cx| async move {
 133            let mut legacy_panics = Vec::new();
 134            let mut crashes = Vec::new();
 135            let mut children = smol::fs::read_dir(paths::logs_dir()).await?;
 136            while let Some(child) = children.next().await {
 137                let child = child?;
 138                let child_path = child.path();
 139
 140                let extension = child_path.extension();
 141                if extension == Some(OsStr::new("panic")) {
 142                    let filename = if let Some(filename) = child_path.file_name() {
 143                        filename.to_string_lossy()
 144                    } else {
 145                        continue;
 146                    };
 147
 148                    if !filename.starts_with("zed") {
 149                        continue;
 150                    }
 151
 152                    let file_contents = smol::fs::read_to_string(&child_path)
 153                        .await
 154                        .context("error reading panic file")?;
 155
 156                    legacy_panics.push(file_contents);
 157                    smol::fs::remove_file(&child_path)
 158                        .await
 159                        .context("error removing panic")
 160                        .log_err();
 161                } else if extension == Some(OsStr::new("dmp")) {
 162                    let mut json_path = child_path.clone();
 163                    json_path.set_extension("json");
 164                    if let Ok(json_content) = smol::fs::read_to_string(&json_path).await {
 165                        crashes.push(CrashReport {
 166                            metadata: json_content,
 167                            minidump_contents: smol::fs::read(&child_path).await?,
 168                        });
 169                        smol::fs::remove_file(&child_path).await.log_err();
 170                        smol::fs::remove_file(&json_path).await.log_err();
 171                    } else {
 172                        log::error!("Couldn't find json metadata for crash: {child_path:?}");
 173                    }
 174                }
 175            }
 176
 177            anyhow::Ok(proto::GetCrashFilesResponse { crashes })
 178        },
 179    );
 180}
 181
 182struct ServerListeners {
 183    stdin: UnixListener,
 184    stdout: UnixListener,
 185    stderr: UnixListener,
 186}
 187
 188impl ServerListeners {
 189    pub fn new(stdin_path: PathBuf, stdout_path: PathBuf, stderr_path: PathBuf) -> Result<Self> {
 190        Ok(Self {
 191            stdin: UnixListener::bind(stdin_path).context("failed to bind stdin socket")?,
 192            stdout: UnixListener::bind(stdout_path).context("failed to bind stdout socket")?,
 193            stderr: UnixListener::bind(stderr_path).context("failed to bind stderr socket")?,
 194        })
 195    }
 196}
 197
 198fn start_server(
 199    listeners: ServerListeners,
 200    log_rx: Receiver<Vec<u8>>,
 201    cx: &mut App,
 202    is_wsl_interop: bool,
 203) -> AnyProtoClient {
 204    // This is the server idle timeout. If no connection comes in this timeout, the server will shut down.
 205    const IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10 * 60);
 206
 207    let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
 208    let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded::<Envelope>();
 209    let (app_quit_tx, mut app_quit_rx) = mpsc::unbounded::<()>();
 210
 211    cx.on_app_quit(move |_| {
 212        let mut app_quit_tx = app_quit_tx.clone();
 213        async move {
 214            log::info!("app quitting. sending signal to server main loop");
 215            app_quit_tx.send(()).await.ok();
 216        }
 217    })
 218    .detach();
 219
 220    cx.spawn(async move |cx| {
 221        let mut stdin_incoming = listeners.stdin.incoming();
 222        let mut stdout_incoming = listeners.stdout.incoming();
 223        let mut stderr_incoming = listeners.stderr.incoming();
 224
 225        loop {
 226            let streams = futures::future::join3(stdin_incoming.next(), stdout_incoming.next(), stderr_incoming.next());
 227
 228            log::info!("accepting new connections");
 229            let result = select! {
 230                streams = streams.fuse() => {
 231                    let (Some(Ok(stdin_stream)), Some(Ok(stdout_stream)), Some(Ok(stderr_stream))) = streams else {
 232                        break;
 233                    };
 234                    anyhow::Ok((stdin_stream, stdout_stream, stderr_stream))
 235                }
 236                _ = futures::FutureExt::fuse(smol::Timer::after(IDLE_TIMEOUT)) => {
 237                    log::warn!("timed out waiting for new connections after {:?}. exiting.", IDLE_TIMEOUT);
 238                    cx.update(|cx| {
 239                        // TODO: This is a hack, because in a headless project, shutdown isn't executed
 240                        // when calling quit, but it should be.
 241                        cx.shutdown();
 242                        cx.quit();
 243                    })?;
 244                    break;
 245                }
 246                _ = app_quit_rx.next().fuse() => {
 247                    break;
 248                }
 249            };
 250
 251            let Ok((mut stdin_stream, mut stdout_stream, mut stderr_stream)) = result else {
 252                break;
 253            };
 254
 255            let mut input_buffer = Vec::new();
 256            let mut output_buffer = Vec::new();
 257
 258            let (mut stdin_msg_tx, mut stdin_msg_rx) = mpsc::unbounded::<Envelope>();
 259            cx.background_spawn(async move {
 260                while let Ok(msg) = read_message(&mut stdin_stream, &mut input_buffer).await {
 261                    if (stdin_msg_tx.send(msg).await).is_err() {
 262                        break;
 263                    }
 264                }
 265            }).detach();
 266
 267            loop {
 268
 269                select_biased! {
 270                    _ = app_quit_rx.next().fuse() => {
 271                        return anyhow::Ok(());
 272                    }
 273
 274                    stdin_message = stdin_msg_rx.next().fuse() => {
 275                        let Some(message) = stdin_message else {
 276                            log::warn!("error reading message on stdin. exiting.");
 277                            break;
 278                        };
 279                        if let Err(error) = incoming_tx.unbounded_send(message) {
 280                            log::error!("failed to send message to application: {error:?}. exiting.");
 281                            return Err(anyhow!(error));
 282                        }
 283                    }
 284
 285                    outgoing_message  = outgoing_rx.next().fuse() => {
 286                        let Some(message) = outgoing_message else {
 287                            log::error!("stdout handler, no message");
 288                            break;
 289                        };
 290
 291                        if let Err(error) =
 292                            write_message(&mut stdout_stream, &mut output_buffer, message).await
 293                        {
 294                            log::error!("failed to write stdout message: {:?}", error);
 295                            break;
 296                        }
 297                        if let Err(error) = stdout_stream.flush().await {
 298                            log::error!("failed to flush stdout message: {:?}", error);
 299                            break;
 300                        }
 301                    }
 302
 303                    log_message = log_rx.recv().fuse() => {
 304                        if let Ok(log_message) = log_message {
 305                            if let Err(error) = stderr_stream.write_all(&log_message).await {
 306                                log::error!("failed to write log message to stderr: {:?}", error);
 307                                break;
 308                            }
 309                            if let Err(error) = stderr_stream.flush().await {
 310                                log::error!("failed to flush stderr stream: {:?}", error);
 311                                break;
 312                            }
 313                        }
 314                    }
 315                }
 316            }
 317        }
 318        anyhow::Ok(())
 319    })
 320    .detach();
 321
 322    RemoteClient::proto_client_from_channels(incoming_rx, outgoing_tx, cx, "server", is_wsl_interop)
 323}
 324
 325fn init_paths() -> anyhow::Result<()> {
 326    for path in [
 327        paths::config_dir(),
 328        paths::extensions_dir(),
 329        paths::languages_dir(),
 330        paths::logs_dir(),
 331        paths::temp_dir(),
 332        paths::hang_traces_dir(),
 333        paths::remote_extensions_dir(),
 334        paths::remote_extensions_uploads_dir(),
 335    ]
 336    .iter()
 337    {
 338        std::fs::create_dir_all(path).with_context(|| format!("creating directory {path:?}"))?;
 339    }
 340    Ok(())
 341}
 342
 343pub fn execute_run(
 344    log_file: PathBuf,
 345    pid_file: PathBuf,
 346    stdin_socket: PathBuf,
 347    stdout_socket: PathBuf,
 348    stderr_socket: PathBuf,
 349) -> Result<()> {
 350    init_paths()?;
 351
 352    match daemonize()? {
 353        ControlFlow::Break(_) => return Ok(()),
 354        ControlFlow::Continue(_) => {}
 355    }
 356
 357    let app = gpui::Application::headless();
 358    let id = std::process::id().to_string();
 359    app.background_executor()
 360        .spawn(crashes::init(crashes::InitCrashHandler {
 361            session_id: id,
 362            zed_version: VERSION.to_owned(),
 363            binary: "zed-remote-server".to_string(),
 364            release_channel: release_channel::RELEASE_CHANNEL_NAME.clone(),
 365            commit_sha: option_env!("ZED_COMMIT_SHA").unwrap_or("no_sha").to_owned(),
 366        }))
 367        .detach();
 368    let log_rx = init_logging_server(log_file)?;
 369    log::info!(
 370        "starting up. pid_file: {:?}, stdin_socket: {:?}, stdout_socket: {:?}, stderr_socket: {:?}",
 371        pid_file,
 372        stdin_socket,
 373        stdout_socket,
 374        stderr_socket
 375    );
 376
 377    write_pid_file(&pid_file)
 378        .with_context(|| format!("failed to write pid file: {:?}", &pid_file))?;
 379
 380    let listeners = ServerListeners::new(stdin_socket, stdout_socket, stderr_socket)?;
 381
 382    rayon::ThreadPoolBuilder::new()
 383        .num_threads(std::thread::available_parallelism().map_or(1, |n| n.get().div_ceil(2)))
 384        .stack_size(10 * 1024 * 1024)
 385        .thread_name(|ix| format!("RayonWorker{}", ix))
 386        .build_global()
 387        .unwrap();
 388
 389    let (shell_env_loaded_tx, shell_env_loaded_rx) = oneshot::channel();
 390    app.background_executor()
 391        .spawn(async {
 392            util::load_login_shell_environment().await.log_err();
 393            shell_env_loaded_tx.send(()).ok();
 394        })
 395        .detach();
 396
 397    let git_hosting_provider_registry = Arc::new(GitHostingProviderRegistry::new());
 398    app.run(move |cx| {
 399        settings::init(cx);
 400        let app_commit_sha = option_env!("ZED_COMMIT_SHA").map(|s| AppCommitSha::new(s.to_owned()));
 401        let app_version = AppVersion::load(
 402            env!("ZED_PKG_VERSION"),
 403            option_env!("ZED_BUILD_ID"),
 404            app_commit_sha,
 405        );
 406        release_channel::init(app_version, cx);
 407        gpui_tokio::init(cx);
 408
 409        HeadlessProject::init(cx);
 410
 411        let is_wsl_interop = if cfg!(target_os = "linux") {
 412            // See: https://learn.microsoft.com/en-us/windows/wsl/filesystems#disable-interoperability
 413            matches!(std::fs::read_to_string("/proc/sys/fs/binfmt_misc/WSLInterop"), Ok(s) if s.contains("enabled"))
 414        } else {
 415            false
 416        };
 417
 418        log::info!("gpui app started, initializing server");
 419        let session = start_server(listeners, log_rx, cx, is_wsl_interop);
 420
 421        GitHostingProviderRegistry::set_global(git_hosting_provider_registry, cx);
 422        git_hosting_providers::init(cx);
 423        dap_adapters::init(cx);
 424
 425        extension::init(cx);
 426        let extension_host_proxy = ExtensionHostProxy::global(cx);
 427
 428        json_schema_store::init(cx);
 429
 430        let project = cx.new(|cx| {
 431            let fs = Arc::new(RealFs::new(None, cx.background_executor().clone()));
 432            let node_settings_rx = initialize_settings(session.clone(), fs.clone(), cx);
 433
 434            let proxy_url = read_proxy_settings(cx);
 435
 436            let http_client = {
 437                let _guard = Tokio::handle(cx).enter();
 438                Arc::new(
 439                    ReqwestClient::proxy_and_user_agent(
 440                        proxy_url,
 441                        &format!(
 442                            "Zed-Server/{} ({}; {})",
 443                            env!("CARGO_PKG_VERSION"),
 444                            std::env::consts::OS,
 445                            std::env::consts::ARCH
 446                        ),
 447                    )
 448                    .expect("Could not start HTTP client"),
 449                )
 450            };
 451
 452            let node_runtime = NodeRuntime::new(
 453                http_client.clone(),
 454                Some(shell_env_loaded_rx),
 455                node_settings_rx,
 456            );
 457
 458            let mut languages = LanguageRegistry::new(cx.background_executor().clone());
 459            languages.set_language_server_download_dir(paths::languages_dir().clone());
 460            let languages = Arc::new(languages);
 461
 462            HeadlessProject::new(
 463                HeadlessAppState {
 464                    session: session.clone(),
 465                    fs,
 466                    http_client,
 467                    node_runtime,
 468                    languages,
 469                    extension_host_proxy,
 470                },
 471                cx,
 472            )
 473        });
 474
 475        handle_crash_files_requests(&project, &session);
 476
 477        cx.background_spawn(async move { cleanup_old_binaries() })
 478            .detach();
 479
 480        mem::forget(project);
 481    });
 482    log::info!("gpui app is shut down. quitting.");
 483    Ok(())
 484}
 485
 486#[derive(Debug, Error)]
 487pub(crate) enum ServerPathError {
 488    #[error("Failed to create server_dir `{path}`")]
 489    CreateServerDir {
 490        #[source]
 491        source: std::io::Error,
 492        path: PathBuf,
 493    },
 494    #[error("Failed to create logs_dir `{path}`")]
 495    CreateLogsDir {
 496        #[source]
 497        source: std::io::Error,
 498        path: PathBuf,
 499    },
 500}
 501
 502#[derive(Clone, Debug)]
 503struct ServerPaths {
 504    log_file: PathBuf,
 505    pid_file: PathBuf,
 506    stdin_socket: PathBuf,
 507    stdout_socket: PathBuf,
 508    stderr_socket: PathBuf,
 509}
 510
 511impl ServerPaths {
 512    fn new(identifier: &str) -> Result<Self, ServerPathError> {
 513        let server_dir = paths::remote_server_state_dir().join(identifier);
 514        std::fs::create_dir_all(&server_dir).map_err(|source| {
 515            ServerPathError::CreateServerDir {
 516                source,
 517                path: server_dir.clone(),
 518            }
 519        })?;
 520        let log_dir = logs_dir();
 521        std::fs::create_dir_all(log_dir).map_err(|source| ServerPathError::CreateLogsDir {
 522            source: source,
 523            path: log_dir.clone(),
 524        })?;
 525
 526        let pid_file = server_dir.join("server.pid");
 527        let stdin_socket = server_dir.join("stdin.sock");
 528        let stdout_socket = server_dir.join("stdout.sock");
 529        let stderr_socket = server_dir.join("stderr.sock");
 530        let log_file = logs_dir().join(format!("server-{}.log", identifier));
 531
 532        Ok(Self {
 533            pid_file,
 534            stdin_socket,
 535            stdout_socket,
 536            stderr_socket,
 537            log_file,
 538        })
 539    }
 540}
 541
 542#[derive(Debug, Error)]
 543pub(crate) enum ExecuteProxyError {
 544    #[error("Failed to init server paths")]
 545    ServerPath(#[from] ServerPathError),
 546
 547    #[error(transparent)]
 548    ServerNotRunning(#[from] ProxyLaunchError),
 549
 550    #[error("Failed to check PidFile '{path}'")]
 551    CheckPidFile {
 552        #[source]
 553        source: CheckPidError,
 554        path: PathBuf,
 555    },
 556
 557    #[error("Failed to kill existing server with pid '{pid}'")]
 558    KillRunningServer {
 559        #[source]
 560        source: std::io::Error,
 561        pid: u32,
 562    },
 563
 564    #[error("failed to spawn server")]
 565    SpawnServer(#[source] SpawnServerError),
 566
 567    #[error("stdin_task failed")]
 568    StdinTask(#[source] anyhow::Error),
 569    #[error("stdout_task failed")]
 570    StdoutTask(#[source] anyhow::Error),
 571    #[error("stderr_task failed")]
 572    StderrTask(#[source] anyhow::Error),
 573}
 574
 575pub(crate) fn execute_proxy(
 576    identifier: String,
 577    is_reconnecting: bool,
 578) -> Result<(), ExecuteProxyError> {
 579    init_logging_proxy();
 580
 581    let server_paths = ServerPaths::new(&identifier)?;
 582
 583    let id = std::process::id().to_string();
 584    smol::spawn(crashes::init(crashes::InitCrashHandler {
 585        session_id: id,
 586        zed_version: VERSION.to_owned(),
 587        binary: "zed-remote-server".to_string(),
 588        release_channel: release_channel::RELEASE_CHANNEL_NAME.clone(),
 589        commit_sha: option_env!("ZED_COMMIT_SHA").unwrap_or("no_sha").to_owned(),
 590    }))
 591    .detach();
 592
 593    log::info!("starting proxy process. PID: {}", std::process::id());
 594    smol::block_on(async {
 595        let server_pid = check_pid_file(&server_paths.pid_file)
 596            .await
 597            .map_err(|source| ExecuteProxyError::CheckPidFile {
 598                source,
 599                path: server_paths.pid_file.clone(),
 600            })?;
 601        let server_running = server_pid.is_some();
 602        if is_reconnecting {
 603            if !server_running {
 604                log::error!("attempted to reconnect, but no server running");
 605                return Err(ExecuteProxyError::ServerNotRunning(
 606                    ProxyLaunchError::ServerNotRunning,
 607                ));
 608            }
 609        } else {
 610            if let Some(pid) = server_pid {
 611                log::info!(
 612                    "proxy found server already running with PID {}. Killing process and cleaning up files...",
 613                    pid
 614                );
 615                kill_running_server(pid, &server_paths).await?;
 616            }
 617
 618            spawn_server(&server_paths)
 619                .await
 620                .map_err(ExecuteProxyError::SpawnServer)?;
 621        };
 622        Ok(())
 623    })?;
 624
 625    let stdin_task = smol::spawn(async move {
 626        let stdin = Async::new(std::io::stdin())?;
 627        let stream = smol::net::unix::UnixStream::connect(&server_paths.stdin_socket).await?;
 628        handle_io(stdin, stream, "stdin").await
 629    });
 630
 631    let stdout_task: smol::Task<Result<()>> = smol::spawn(async move {
 632        let stdout = Async::new(std::io::stdout())?;
 633        let stream = smol::net::unix::UnixStream::connect(&server_paths.stdout_socket).await?;
 634        handle_io(stream, stdout, "stdout").await
 635    });
 636
 637    let stderr_task: smol::Task<Result<()>> = smol::spawn(async move {
 638        let mut stderr = Async::new(std::io::stderr())?;
 639        let mut stream = smol::net::unix::UnixStream::connect(&server_paths.stderr_socket).await?;
 640        let mut stderr_buffer = vec![0; 2048];
 641        loop {
 642            match stream
 643                .read(&mut stderr_buffer)
 644                .await
 645                .context("reading stderr")?
 646            {
 647                0 => {
 648                    let error =
 649                        std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "stderr closed");
 650                    Err(anyhow!(error))?;
 651                }
 652                n => {
 653                    stderr.write_all(&stderr_buffer[..n]).await?;
 654                    stderr.flush().await?;
 655                }
 656            }
 657        }
 658    });
 659
 660    if let Err(forwarding_result) = smol::block_on(async move {
 661        futures::select! {
 662            result = stdin_task.fuse() => result.map_err(ExecuteProxyError::StdinTask),
 663            result = stdout_task.fuse() => result.map_err(ExecuteProxyError::StdoutTask),
 664            result = stderr_task.fuse() => result.map_err(ExecuteProxyError::StderrTask),
 665        }
 666    }) {
 667        log::error!(
 668            "encountered error while forwarding messages: {:?}, terminating...",
 669            forwarding_result
 670        );
 671        return Err(forwarding_result);
 672    }
 673
 674    Ok(())
 675}
 676
 677async fn kill_running_server(pid: u32, paths: &ServerPaths) -> Result<(), ExecuteProxyError> {
 678    log::info!("killing existing server with PID {}", pid);
 679    new_smol_command("kill")
 680        .arg(pid.to_string())
 681        .output()
 682        .await
 683        .map_err(|source| ExecuteProxyError::KillRunningServer { source, pid })?;
 684
 685    for file in [
 686        &paths.pid_file,
 687        &paths.stdin_socket,
 688        &paths.stdout_socket,
 689        &paths.stderr_socket,
 690    ] {
 691        log::debug!("cleaning up file {:?} before starting new server", file);
 692        std::fs::remove_file(file).ok();
 693    }
 694    Ok(())
 695}
 696
 697#[derive(Debug, Error)]
 698pub(crate) enum SpawnServerError {
 699    #[error("failed to remove stdin socket")]
 700    RemoveStdinSocket(#[source] std::io::Error),
 701
 702    #[error("failed to remove stdout socket")]
 703    RemoveStdoutSocket(#[source] std::io::Error),
 704
 705    #[error("failed to remove stderr socket")]
 706    RemoveStderrSocket(#[source] std::io::Error),
 707
 708    #[error("failed to get current_exe")]
 709    CurrentExe(#[source] std::io::Error),
 710
 711    #[error("failed to launch server process")]
 712    ProcessStatus(#[source] std::io::Error),
 713
 714    #[error("failed to launch and detach server process: {status}\n{paths}")]
 715    LaunchStatus { status: ExitStatus, paths: String },
 716}
 717
 718async fn spawn_server(paths: &ServerPaths) -> Result<(), SpawnServerError> {
 719    if paths.stdin_socket.exists() {
 720        std::fs::remove_file(&paths.stdin_socket).map_err(SpawnServerError::RemoveStdinSocket)?;
 721    }
 722    if paths.stdout_socket.exists() {
 723        std::fs::remove_file(&paths.stdout_socket).map_err(SpawnServerError::RemoveStdoutSocket)?;
 724    }
 725    if paths.stderr_socket.exists() {
 726        std::fs::remove_file(&paths.stderr_socket).map_err(SpawnServerError::RemoveStderrSocket)?;
 727    }
 728
 729    let binary_name = std::env::current_exe().map_err(SpawnServerError::CurrentExe)?;
 730    let mut server_process = new_smol_command(binary_name);
 731    server_process
 732        .arg("run")
 733        .arg("--log-file")
 734        .arg(&paths.log_file)
 735        .arg("--pid-file")
 736        .arg(&paths.pid_file)
 737        .arg("--stdin-socket")
 738        .arg(&paths.stdin_socket)
 739        .arg("--stdout-socket")
 740        .arg(&paths.stdout_socket)
 741        .arg("--stderr-socket")
 742        .arg(&paths.stderr_socket);
 743
 744    let status = server_process
 745        .status()
 746        .await
 747        .map_err(SpawnServerError::ProcessStatus)?;
 748
 749    if !status.success() {
 750        return Err(SpawnServerError::LaunchStatus {
 751            status,
 752            paths: format!(
 753                "log file: {:?}, pid file: {:?}",
 754                paths.log_file, paths.pid_file,
 755            ),
 756        });
 757    }
 758
 759    let mut total_time_waited = std::time::Duration::from_secs(0);
 760    let wait_duration = std::time::Duration::from_millis(20);
 761    while !paths.stdout_socket.exists()
 762        || !paths.stdin_socket.exists()
 763        || !paths.stderr_socket.exists()
 764    {
 765        log::debug!("waiting for server to be ready to accept connections...");
 766        std::thread::sleep(wait_duration);
 767        total_time_waited += wait_duration;
 768    }
 769
 770    log::info!(
 771        "server ready to accept connections. total time waited: {:?}",
 772        total_time_waited
 773    );
 774
 775    Ok(())
 776}
 777
 778#[derive(Debug, Error)]
 779#[error("Failed to remove PID file for missing process (pid `{pid}`")]
 780pub(crate) struct CheckPidError {
 781    #[source]
 782    source: std::io::Error,
 783    pid: u32,
 784}
 785
 786async fn check_pid_file(path: &Path) -> Result<Option<u32>, CheckPidError> {
 787    let Some(pid) = std::fs::read_to_string(&path)
 788        .ok()
 789        .and_then(|contents| contents.parse::<u32>().ok())
 790    else {
 791        return Ok(None);
 792    };
 793
 794    log::debug!("Checking if process with PID {} exists...", pid);
 795    match new_smol_command("kill")
 796        .arg("-0")
 797        .arg(pid.to_string())
 798        .output()
 799        .await
 800    {
 801        Ok(output) if output.status.success() => {
 802            log::debug!(
 803                "Process with PID {} exists. NOT spawning new server, but attaching to existing one.",
 804                pid
 805            );
 806            Ok(Some(pid))
 807        }
 808        _ => {
 809            log::debug!(
 810                "Found PID file, but process with that PID does not exist. Removing PID file."
 811            );
 812            std::fs::remove_file(&path).map_err(|source| CheckPidError { source, pid })?;
 813            Ok(None)
 814        }
 815    }
 816}
 817
 818fn write_pid_file(path: &Path) -> Result<()> {
 819    if path.exists() {
 820        std::fs::remove_file(path)?;
 821    }
 822    let pid = std::process::id().to_string();
 823    log::debug!("writing PID {} to file {:?}", pid, path);
 824    std::fs::write(path, pid).context("Failed to write PID file")
 825}
 826
 827async fn handle_io<R, W>(mut reader: R, mut writer: W, socket_name: &str) -> Result<()>
 828where
 829    R: AsyncRead + Unpin,
 830    W: AsyncWrite + Unpin,
 831{
 832    use remote::protocol::{read_message_raw, write_size_prefixed_buffer};
 833
 834    let mut buffer = Vec::new();
 835    loop {
 836        read_message_raw(&mut reader, &mut buffer)
 837            .await
 838            .with_context(|| format!("failed to read message from {}", socket_name))?;
 839        write_size_prefixed_buffer(&mut writer, &mut buffer)
 840            .await
 841            .with_context(|| format!("failed to write message to {}", socket_name))?;
 842        writer.flush().await?;
 843        buffer.clear();
 844    }
 845}
 846
 847fn initialize_settings(
 848    session: AnyProtoClient,
 849    fs: Arc<dyn Fs>,
 850    cx: &mut App,
 851) -> watch::Receiver<Option<NodeBinaryOptions>> {
 852    let user_settings_file_rx =
 853        watch_config_file(cx.background_executor(), fs, paths::settings_file().clone());
 854
 855    handle_settings_file_changes(user_settings_file_rx, cx, {
 856        move |err, _cx| {
 857            if let Some(e) = err {
 858                log::info!("Server settings failed to change: {}", e);
 859
 860                session
 861                    .send(proto::Toast {
 862                        project_id: REMOTE_SERVER_PROJECT_ID,
 863                        notification_id: "server-settings-failed".to_string(),
 864                        message: format!(
 865                            "Error in settings on remote host {:?}: {}",
 866                            paths::settings_file(),
 867                            e
 868                        ),
 869                    })
 870                    .log_err();
 871            } else {
 872                session
 873                    .send(proto::HideToast {
 874                        project_id: REMOTE_SERVER_PROJECT_ID,
 875                        notification_id: "server-settings-failed".to_string(),
 876                    })
 877                    .log_err();
 878            }
 879        }
 880    });
 881
 882    let (mut tx, rx) = watch::channel(None);
 883    let mut node_settings = None;
 884    cx.observe_global::<SettingsStore>(move |cx| {
 885        let new_node_settings = &ProjectSettings::get_global(cx).node;
 886        if Some(new_node_settings) != node_settings.as_ref() {
 887            log::info!("Got new node settings: {new_node_settings:?}");
 888            let options = NodeBinaryOptions {
 889                allow_path_lookup: !new_node_settings.ignore_system_version,
 890                // TODO: Implement this setting
 891                allow_binary_download: true,
 892                use_paths: new_node_settings.path.as_ref().map(|node_path| {
 893                    let node_path = PathBuf::from(shellexpand::tilde(node_path).as_ref());
 894                    let npm_path = new_node_settings
 895                        .npm_path
 896                        .as_ref()
 897                        .map(|path| PathBuf::from(shellexpand::tilde(&path).as_ref()));
 898                    (
 899                        node_path.clone(),
 900                        npm_path.unwrap_or_else(|| {
 901                            let base_path = PathBuf::new();
 902                            node_path.parent().unwrap_or(&base_path).join("npm")
 903                        }),
 904                    )
 905                }),
 906            };
 907            node_settings = Some(new_node_settings.clone());
 908            tx.send(Some(options)).ok();
 909        }
 910    })
 911    .detach();
 912
 913    rx
 914}
 915
 916pub fn handle_settings_file_changes(
 917    mut server_settings_file: mpsc::UnboundedReceiver<String>,
 918    cx: &mut App,
 919    settings_changed: impl Fn(Option<anyhow::Error>, &mut App) + 'static,
 920) {
 921    let server_settings_content = cx
 922        .background_executor()
 923        .block(server_settings_file.next())
 924        .unwrap();
 925    SettingsStore::update_global(cx, |store, cx| {
 926        store
 927            .set_server_settings(&server_settings_content, cx)
 928            .log_err();
 929    });
 930    cx.spawn(async move |cx| {
 931        while let Some(server_settings_content) = server_settings_file.next().await {
 932            let result = cx.update_global(|store: &mut SettingsStore, cx| {
 933                let result = store.set_server_settings(&server_settings_content, cx);
 934                if let Err(err) = &result {
 935                    log::error!("Failed to load server settings: {err}");
 936                }
 937                settings_changed(result.err(), cx);
 938                cx.refresh_windows();
 939            });
 940            if result.is_err() {
 941                break; // App dropped
 942            }
 943        }
 944    })
 945    .detach();
 946}
 947
 948fn read_proxy_settings(cx: &mut Context<HeadlessProject>) -> Option<Url> {
 949    let proxy_str = ProxySettings::get_global(cx).proxy.to_owned();
 950
 951    proxy_str
 952        .as_ref()
 953        .and_then(|input: &String| {
 954            input
 955                .parse::<Url>()
 956                .inspect_err(|e| log::error!("Error parsing proxy settings: {}", e))
 957                .ok()
 958        })
 959        .or_else(read_proxy_from_env)
 960}
 961
 962fn daemonize() -> Result<ControlFlow<()>> {
 963    match fork::fork().map_err(|e| anyhow!("failed to call fork with error code {e}"))? {
 964        fork::Fork::Parent(_) => {
 965            return Ok(ControlFlow::Break(()));
 966        }
 967        fork::Fork::Child => {}
 968    }
 969
 970    // Once we've detached from the parent, we want to close stdout/stderr/stdin
 971    // so that the outer SSH process is not attached to us in any way anymore.
 972    unsafe { redirect_standard_streams() }?;
 973
 974    Ok(ControlFlow::Continue(()))
 975}
 976
 977unsafe fn redirect_standard_streams() -> Result<()> {
 978    let devnull_fd = unsafe { libc::open(b"/dev/null\0" as *const [u8; 10] as _, libc::O_RDWR) };
 979    anyhow::ensure!(devnull_fd != -1, "failed to open /dev/null");
 980
 981    let process_stdio = |name, fd| {
 982        let reopened_fd = unsafe { libc::dup2(devnull_fd, fd) };
 983        anyhow::ensure!(
 984            reopened_fd != -1,
 985            format!("failed to redirect {} to /dev/null", name)
 986        );
 987        Ok(())
 988    };
 989
 990    process_stdio("stdin", libc::STDIN_FILENO)?;
 991    process_stdio("stdout", libc::STDOUT_FILENO)?;
 992    process_stdio("stderr", libc::STDERR_FILENO)?;
 993
 994    anyhow::ensure!(
 995        unsafe { libc::close(devnull_fd) != -1 },
 996        "failed to close /dev/null fd after redirecting"
 997    );
 998
 999    Ok(())
1000}
1001
1002fn cleanup_old_binaries() -> Result<()> {
1003    let server_dir = paths::remote_server_dir_relative();
1004    let release_channel = release_channel::RELEASE_CHANNEL.dev_name();
1005    let prefix = format!("zed-remote-server-{}-", release_channel);
1006
1007    for entry in std::fs::read_dir(server_dir.as_std_path())? {
1008        let path = entry?.path();
1009
1010        if let Some(file_name) = path.file_name()
1011            && let Some(version) = file_name.to_string_lossy().strip_prefix(&prefix)
1012            && !is_new_version(version)
1013            && !is_file_in_use(file_name)
1014        {
1015            log::info!("removing old remote server binary: {:?}", path);
1016            std::fs::remove_file(&path)?;
1017        }
1018    }
1019
1020    Ok(())
1021}
1022
1023fn is_new_version(version: &str) -> bool {
1024    semver::Version::from_str(version)
1025        .ok()
1026        .zip(semver::Version::from_str(env!("ZED_PKG_VERSION")).ok())
1027        .is_some_and(|(version, current_version)| version >= current_version)
1028}
1029
1030fn is_file_in_use(file_name: &OsStr) -> bool {
1031    let info = sysinfo::System::new_with_specifics(sysinfo::RefreshKind::nothing().with_processes(
1032        sysinfo::ProcessRefreshKind::nothing().with_exe(sysinfo::UpdateKind::Always),
1033    ));
1034
1035    for process in info.processes().values() {
1036        if process
1037            .exe()
1038            .is_some_and(|exe| exe.file_name().is_some_and(|name| name == file_name))
1039        {
1040            return true;
1041        }
1042    }
1043
1044    false
1045}