1use crate::HeadlessProject;
2use crate::headless_project::HeadlessAppState;
3use anyhow::{Context as _, Result, anyhow};
4use client::{ProjectId, ProxySettings};
5use collections::HashMap;
6use project::trusted_worktrees;
7use util::ResultExt;
8
9use extension::ExtensionHostProxy;
10use fs::{Fs, RealFs};
11use futures::channel::{mpsc, oneshot};
12use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, SinkExt, select, select_biased};
13use git::GitHostingProviderRegistry;
14use gpui::{App, AppContext as _, Context, Entity, UpdateGlobal as _};
15use gpui_tokio::Tokio;
16use http_client::{Url, read_proxy_from_env};
17use language::LanguageRegistry;
18use node_runtime::{NodeBinaryOptions, NodeRuntime};
19use paths::logs_dir;
20use project::project_settings::ProjectSettings;
21use util::command::new_smol_command;
22
23use proto::CrashReport;
24use release_channel::{AppCommitSha, AppVersion, RELEASE_CHANNEL, ReleaseChannel};
25use remote::RemoteClient;
26use remote::{
27 json_log::LogRecord,
28 protocol::{read_message, write_message},
29 proxy::ProxyLaunchError,
30};
31use reqwest_client::ReqwestClient;
32use rpc::proto::{self, Envelope, REMOTE_SERVER_PROJECT_ID};
33use rpc::{AnyProtoClient, TypedEnvelope};
34use settings::{Settings, SettingsStore, watch_config_file};
35
36use smol::channel::{Receiver, Sender};
37use smol::io::AsyncReadExt;
38use smol::{net::unix::UnixListener, stream::StreamExt as _};
39use std::{
40 env,
41 ffi::OsStr,
42 fs::File,
43 io::Write,
44 mem,
45 ops::ControlFlow,
46 path::{Path, PathBuf},
47 process::ExitStatus,
48 str::FromStr,
49 sync::{Arc, LazyLock},
50};
51use thiserror::Error;
52
53pub static VERSION: LazyLock<String> = LazyLock::new(|| match *RELEASE_CHANNEL {
54 ReleaseChannel::Stable | ReleaseChannel::Preview => env!("ZED_PKG_VERSION").to_owned(),
55 ReleaseChannel::Nightly | ReleaseChannel::Dev => {
56 let commit_sha = option_env!("ZED_COMMIT_SHA").unwrap_or("missing-zed-commit-sha");
57 let build_identifier = option_env!("ZED_BUILD_ID");
58 if let Some(build_id) = build_identifier {
59 format!("{build_id}+{commit_sha}")
60 } else {
61 commit_sha.to_owned()
62 }
63 }
64});
65
66fn init_logging_proxy() {
67 env_logger::builder()
68 .format(|buf, record| {
69 let mut log_record = LogRecord::new(record);
70 log_record.message =
71 std::borrow::Cow::Owned(format!("(remote proxy) {}", log_record.message));
72 serde_json::to_writer(&mut *buf, &log_record)?;
73 buf.write_all(b"\n")?;
74 Ok(())
75 })
76 .init();
77}
78
79fn init_logging_server(log_file_path: &Path) -> Result<Receiver<Vec<u8>>> {
80 struct MultiWrite {
81 file: File,
82 channel: Sender<Vec<u8>>,
83 buffer: Vec<u8>,
84 }
85
86 impl Write for MultiWrite {
87 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
88 let written = self.file.write(buf)?;
89 self.buffer.extend_from_slice(&buf[..written]);
90 Ok(written)
91 }
92
93 fn flush(&mut self) -> std::io::Result<()> {
94 self.channel
95 .send_blocking(self.buffer.clone())
96 .map_err(std::io::Error::other)?;
97 self.buffer.clear();
98 self.file.flush()
99 }
100 }
101
102 let log_file = std::fs::OpenOptions::new()
103 .create(true)
104 .append(true)
105 .open(log_file_path)
106 .context("Failed to open log file in append mode")?;
107
108 let (tx, rx) = smol::channel::unbounded();
109
110 let target = Box::new(MultiWrite {
111 file: log_file,
112 channel: tx,
113 buffer: Vec::new(),
114 });
115
116 let old_hook = std::panic::take_hook();
117 std::panic::set_hook(Box::new(move |info| {
118 log::error!("Panic occurred: {:?}", info);
119 old_hook(info);
120 }));
121 env_logger::Builder::new()
122 .filter_level(log::LevelFilter::Info)
123 .parse_default_env()
124 .target(env_logger::Target::Pipe(target))
125 .format(|buf, record| {
126 let mut log_record = LogRecord::new(record);
127 log_record.message =
128 std::borrow::Cow::Owned(format!("(remote server) {}", log_record.message));
129 serde_json::to_writer(&mut *buf, &log_record)?;
130 buf.write_all(b"\n")?;
131 Ok(())
132 })
133 .init();
134
135 Ok(rx)
136}
137
138fn handle_crash_files_requests(project: &Entity<HeadlessProject>, client: &AnyProtoClient) {
139 client.add_request_handler(
140 project.downgrade(),
141 |_, _: TypedEnvelope<proto::GetCrashFiles>, _cx| async move {
142 let mut legacy_panics = Vec::new();
143 let mut crashes = Vec::new();
144 let mut children = smol::fs::read_dir(paths::logs_dir()).await?;
145 while let Some(child) = children.next().await {
146 let child = child?;
147 let child_path = child.path();
148
149 let extension = child_path.extension();
150 if extension == Some(OsStr::new("panic")) {
151 let filename = if let Some(filename) = child_path.file_name() {
152 filename.to_string_lossy()
153 } else {
154 continue;
155 };
156
157 if !filename.starts_with("zed") {
158 continue;
159 }
160
161 let file_contents = smol::fs::read_to_string(&child_path)
162 .await
163 .context("error reading panic file")?;
164
165 legacy_panics.push(file_contents);
166 smol::fs::remove_file(&child_path)
167 .await
168 .context("error removing panic")
169 .log_err();
170 } else if extension == Some(OsStr::new("dmp")) {
171 let mut json_path = child_path.clone();
172 json_path.set_extension("json");
173 if let Ok(json_content) = smol::fs::read_to_string(&json_path).await {
174 crashes.push(CrashReport {
175 metadata: json_content,
176 minidump_contents: smol::fs::read(&child_path).await?,
177 });
178 smol::fs::remove_file(&child_path).await.log_err();
179 smol::fs::remove_file(&json_path).await.log_err();
180 } else {
181 log::error!("Couldn't find json metadata for crash: {child_path:?}");
182 }
183 }
184 }
185
186 anyhow::Ok(proto::GetCrashFilesResponse { crashes })
187 },
188 );
189}
190
191struct ServerListeners {
192 stdin: UnixListener,
193 stdout: UnixListener,
194 stderr: UnixListener,
195}
196
197impl ServerListeners {
198 pub fn new(stdin_path: PathBuf, stdout_path: PathBuf, stderr_path: PathBuf) -> Result<Self> {
199 Ok(Self {
200 stdin: UnixListener::bind(stdin_path).context("failed to bind stdin socket")?,
201 stdout: UnixListener::bind(stdout_path).context("failed to bind stdout socket")?,
202 stderr: UnixListener::bind(stderr_path).context("failed to bind stderr socket")?,
203 })
204 }
205}
206
207fn start_server(
208 listeners: ServerListeners,
209 log_rx: Receiver<Vec<u8>>,
210 cx: &mut App,
211 is_wsl_interop: bool,
212) -> AnyProtoClient {
213 // This is the server idle timeout. If no connection comes in this timeout, the server will shut down.
214 const IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10 * 60);
215
216 let (incoming_tx, incoming_rx) = mpsc::unbounded::<Envelope>();
217 let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded::<Envelope>();
218 let (app_quit_tx, mut app_quit_rx) = mpsc::unbounded::<()>();
219
220 cx.on_app_quit(move |_| {
221 let mut app_quit_tx = app_quit_tx.clone();
222 async move {
223 log::info!("app quitting. sending signal to server main loop");
224 app_quit_tx.send(()).await.ok();
225 }
226 })
227 .detach();
228
229 cx.spawn(async move |cx| {
230 let mut stdin_incoming = listeners.stdin.incoming();
231 let mut stdout_incoming = listeners.stdout.incoming();
232 let mut stderr_incoming = listeners.stderr.incoming();
233
234 loop {
235 let streams = futures::future::join3(stdin_incoming.next(), stdout_incoming.next(), stderr_incoming.next());
236
237 log::info!("accepting new connections");
238 let result = select! {
239 streams = streams.fuse() => {
240 let (Some(Ok(stdin_stream)), Some(Ok(stdout_stream)), Some(Ok(stderr_stream))) = streams else {
241 break;
242 };
243 anyhow::Ok((stdin_stream, stdout_stream, stderr_stream))
244 }
245 _ = futures::FutureExt::fuse(cx.background_executor().timer(IDLE_TIMEOUT)) => {
246 log::warn!("timed out waiting for new connections after {:?}. exiting.", IDLE_TIMEOUT);
247 cx.update(|cx| {
248 // TODO: This is a hack, because in a headless project, shutdown isn't executed
249 // when calling quit, but it should be.
250 cx.shutdown();
251 cx.quit();
252 });
253 break;
254 }
255 _ = app_quit_rx.next().fuse() => {
256 break;
257 }
258 };
259
260 let Ok((mut stdin_stream, mut stdout_stream, mut stderr_stream)) = result else {
261 break;
262 };
263
264 let mut input_buffer = Vec::new();
265 let mut output_buffer = Vec::new();
266
267 let (mut stdin_msg_tx, mut stdin_msg_rx) = mpsc::unbounded::<Envelope>();
268 cx.background_spawn(async move {
269 while let Ok(msg) = read_message(&mut stdin_stream, &mut input_buffer).await {
270 if (stdin_msg_tx.send(msg).await).is_err() {
271 break;
272 }
273 }
274 }).detach();
275
276 loop {
277
278 select_biased! {
279 _ = app_quit_rx.next().fuse() => {
280 return anyhow::Ok(());
281 }
282
283 stdin_message = stdin_msg_rx.next().fuse() => {
284 let Some(message) = stdin_message else {
285 log::warn!("error reading message on stdin. exiting.");
286 break;
287 };
288 if let Err(error) = incoming_tx.unbounded_send(message) {
289 log::error!("failed to send message to application: {error:?}. exiting.");
290 return Err(anyhow!(error));
291 }
292 }
293
294 outgoing_message = outgoing_rx.next().fuse() => {
295 let Some(message) = outgoing_message else {
296 log::error!("stdout handler, no message");
297 break;
298 };
299
300 if let Err(error) =
301 write_message(&mut stdout_stream, &mut output_buffer, message).await
302 {
303 log::error!("failed to write stdout message: {:?}", error);
304 break;
305 }
306 if let Err(error) = stdout_stream.flush().await {
307 log::error!("failed to flush stdout message: {:?}", error);
308 break;
309 }
310 }
311
312 log_message = log_rx.recv().fuse() => {
313 if let Ok(log_message) = log_message {
314 if let Err(error) = stderr_stream.write_all(&log_message).await {
315 log::error!("failed to write log message to stderr: {:?}", error);
316 break;
317 }
318 if let Err(error) = stderr_stream.flush().await {
319 log::error!("failed to flush stderr stream: {:?}", error);
320 break;
321 }
322 }
323 }
324 }
325 }
326 }
327 anyhow::Ok(())
328 })
329 .detach();
330
331 RemoteClient::proto_client_from_channels(incoming_rx, outgoing_tx, cx, "server", is_wsl_interop)
332}
333
334fn init_paths() -> anyhow::Result<()> {
335 for path in [
336 paths::config_dir(),
337 paths::extensions_dir(),
338 paths::languages_dir(),
339 paths::logs_dir(),
340 paths::temp_dir(),
341 paths::hang_traces_dir(),
342 paths::remote_extensions_dir(),
343 paths::remote_extensions_uploads_dir(),
344 ]
345 .iter()
346 {
347 std::fs::create_dir_all(path).with_context(|| format!("creating directory {path:?}"))?;
348 }
349 Ok(())
350}
351
352pub fn execute_run(
353 log_file: PathBuf,
354 pid_file: PathBuf,
355 stdin_socket: PathBuf,
356 stdout_socket: PathBuf,
357 stderr_socket: PathBuf,
358) -> Result<()> {
359 init_paths()?;
360
361 match daemonize()? {
362 ControlFlow::Break(_) => return Ok(()),
363 ControlFlow::Continue(_) => {}
364 }
365
366 let app = gpui::Application::headless();
367 let id = std::process::id().to_string();
368 app.background_executor()
369 .spawn(crashes::init(crashes::InitCrashHandler {
370 session_id: id,
371 zed_version: VERSION.to_owned(),
372 binary: "zed-remote-server".to_string(),
373 release_channel: release_channel::RELEASE_CHANNEL_NAME.clone(),
374 commit_sha: option_env!("ZED_COMMIT_SHA").unwrap_or("no_sha").to_owned(),
375 }))
376 .detach();
377 let log_rx = init_logging_server(&log_file)?;
378 log::info!(
379 "starting up. pid_file: {:?}, log_file: {:?}, stdin_socket: {:?}, stdout_socket: {:?}, stderr_socket: {:?}",
380 pid_file,
381 log_file,
382 stdin_socket,
383 stdout_socket,
384 stderr_socket
385 );
386
387 write_pid_file(&pid_file)
388 .with_context(|| format!("failed to write pid file: {:?}", &pid_file))?;
389
390 let listeners = ServerListeners::new(stdin_socket, stdout_socket, stderr_socket)?;
391
392 rayon::ThreadPoolBuilder::new()
393 .num_threads(std::thread::available_parallelism().map_or(1, |n| n.get().div_ceil(2)))
394 .stack_size(10 * 1024 * 1024)
395 .thread_name(|ix| format!("RayonWorker{}", ix))
396 .build_global()
397 .unwrap();
398
399 let (shell_env_loaded_tx, shell_env_loaded_rx) = oneshot::channel();
400 app.background_executor()
401 .spawn(async {
402 util::load_login_shell_environment().await.log_err();
403 shell_env_loaded_tx.send(()).ok();
404 })
405 .detach();
406
407 let git_hosting_provider_registry = Arc::new(GitHostingProviderRegistry::new());
408 app.run(move |cx| {
409 settings::init(cx);
410 let app_commit_sha = option_env!("ZED_COMMIT_SHA").map(|s| AppCommitSha::new(s.to_owned()));
411 let app_version = AppVersion::load(
412 env!("ZED_PKG_VERSION"),
413 option_env!("ZED_BUILD_ID"),
414 app_commit_sha,
415 );
416 release_channel::init(app_version, cx);
417 gpui_tokio::init(cx);
418
419 HeadlessProject::init(cx);
420
421 let is_wsl_interop = if cfg!(target_os = "linux") {
422 // See: https://learn.microsoft.com/en-us/windows/wsl/filesystems#disable-interoperability
423 matches!(std::fs::read_to_string("/proc/sys/fs/binfmt_misc/WSLInterop"), Ok(s) if s.contains("enabled"))
424 } else {
425 false
426 };
427
428 log::info!("gpui app started, initializing server");
429 let session = start_server(listeners, log_rx, cx, is_wsl_interop);
430 trusted_worktrees::init(HashMap::default(), Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))), None, cx);
431
432 GitHostingProviderRegistry::set_global(git_hosting_provider_registry, cx);
433 git_hosting_providers::init(cx);
434 dap_adapters::init(cx);
435
436 extension::init(cx);
437 let extension_host_proxy = ExtensionHostProxy::global(cx);
438
439 json_schema_store::init(cx);
440
441 let project = cx.new(|cx| {
442 let fs = Arc::new(RealFs::new(None, cx.background_executor().clone()));
443 let node_settings_rx = initialize_settings(session.clone(), fs.clone(), cx);
444
445 let proxy_url = read_proxy_settings(cx);
446
447 let http_client = {
448 let _guard = Tokio::handle(cx).enter();
449 Arc::new(
450 ReqwestClient::proxy_and_user_agent(
451 proxy_url,
452 &format!(
453 "Zed-Server/{} ({}; {})",
454 env!("CARGO_PKG_VERSION"),
455 std::env::consts::OS,
456 std::env::consts::ARCH
457 ),
458 )
459 .expect("Could not start HTTP client"),
460 )
461 };
462
463 let node_runtime = NodeRuntime::new(
464 http_client.clone(),
465 Some(shell_env_loaded_rx),
466 node_settings_rx,
467 );
468
469 let mut languages = LanguageRegistry::new(cx.background_executor().clone());
470 languages.set_language_server_download_dir(paths::languages_dir().clone());
471 let languages = Arc::new(languages);
472
473 HeadlessProject::new(
474 HeadlessAppState {
475 session: session.clone(),
476 fs,
477 http_client,
478 node_runtime,
479 languages,
480 extension_host_proxy,
481 },
482 true,
483 cx,
484 )
485 });
486
487 handle_crash_files_requests(&project, &session);
488
489 cx.background_spawn(async move { cleanup_old_binaries() })
490 .detach();
491
492 mem::forget(project);
493 });
494 log::info!("gpui app is shut down. quitting.");
495 Ok(())
496}
497
498#[derive(Debug, Error)]
499pub(crate) enum ServerPathError {
500 #[error("Failed to create server_dir `{path}`")]
501 CreateServerDir {
502 #[source]
503 source: std::io::Error,
504 path: PathBuf,
505 },
506 #[error("Failed to create logs_dir `{path}`")]
507 CreateLogsDir {
508 #[source]
509 source: std::io::Error,
510 path: PathBuf,
511 },
512}
513
514#[derive(Clone, Debug)]
515struct ServerPaths {
516 log_file: PathBuf,
517 pid_file: PathBuf,
518 stdin_socket: PathBuf,
519 stdout_socket: PathBuf,
520 stderr_socket: PathBuf,
521}
522
523impl ServerPaths {
524 fn new(identifier: &str) -> Result<Self, ServerPathError> {
525 let server_dir = paths::remote_server_state_dir().join(identifier);
526 std::fs::create_dir_all(&server_dir).map_err(|source| {
527 ServerPathError::CreateServerDir {
528 source,
529 path: server_dir.clone(),
530 }
531 })?;
532 let log_dir = logs_dir();
533 std::fs::create_dir_all(log_dir).map_err(|source| ServerPathError::CreateLogsDir {
534 source: source,
535 path: log_dir.clone(),
536 })?;
537
538 let pid_file = server_dir.join("server.pid");
539 let stdin_socket = server_dir.join("stdin.sock");
540 let stdout_socket = server_dir.join("stdout.sock");
541 let stderr_socket = server_dir.join("stderr.sock");
542 let log_file = logs_dir().join(format!("server-{}.log", identifier));
543
544 Ok(Self {
545 pid_file,
546 stdin_socket,
547 stdout_socket,
548 stderr_socket,
549 log_file,
550 })
551 }
552}
553
554#[derive(Debug, Error)]
555pub(crate) enum ExecuteProxyError {
556 #[error("Failed to init server paths")]
557 ServerPath(#[from] ServerPathError),
558
559 #[error(transparent)]
560 ServerNotRunning(#[from] ProxyLaunchError),
561
562 #[error("Failed to check PidFile '{path}'")]
563 CheckPidFile {
564 #[source]
565 source: CheckPidError,
566 path: PathBuf,
567 },
568
569 #[error("Failed to kill existing server with pid '{pid}'")]
570 KillRunningServer {
571 #[source]
572 source: std::io::Error,
573 pid: u32,
574 },
575
576 #[error("failed to spawn server")]
577 SpawnServer(#[source] SpawnServerError),
578
579 #[error("stdin_task failed")]
580 StdinTask(#[source] anyhow::Error),
581 #[error("stdout_task failed")]
582 StdoutTask(#[source] anyhow::Error),
583 #[error("stderr_task failed")]
584 StderrTask(#[source] anyhow::Error),
585}
586
587pub(crate) fn execute_proxy(
588 identifier: String,
589 is_reconnecting: bool,
590) -> Result<(), ExecuteProxyError> {
591 init_logging_proxy();
592
593 let server_paths = ServerPaths::new(&identifier)?;
594
595 let id = std::process::id().to_string();
596 smol::spawn(crashes::init(crashes::InitCrashHandler {
597 session_id: id,
598 zed_version: VERSION.to_owned(),
599 binary: "zed-remote-server".to_string(),
600 release_channel: release_channel::RELEASE_CHANNEL_NAME.clone(),
601 commit_sha: option_env!("ZED_COMMIT_SHA").unwrap_or("no_sha").to_owned(),
602 }))
603 .detach();
604
605 log::info!("starting proxy process. PID: {}", std::process::id());
606 smol::block_on(async {
607 let server_pid = check_pid_file(&server_paths.pid_file)
608 .await
609 .map_err(|source| ExecuteProxyError::CheckPidFile {
610 source,
611 path: server_paths.pid_file.clone(),
612 })?;
613 let server_running = server_pid.is_some();
614 if is_reconnecting {
615 if !server_running {
616 log::error!("attempted to reconnect, but no server running");
617 return Err(ExecuteProxyError::ServerNotRunning(
618 ProxyLaunchError::ServerNotRunning,
619 ));
620 }
621 } else {
622 if let Some(pid) = server_pid {
623 log::info!(
624 "proxy found server already running with PID {}. Killing process and cleaning up files...",
625 pid
626 );
627 kill_running_server(pid, &server_paths).await?;
628 }
629
630 spawn_server(&server_paths)
631 .await
632 .map_err(ExecuteProxyError::SpawnServer)?;
633 };
634 Ok(())
635 })?;
636
637 let stdin_task = smol::spawn(async move {
638 let stdin = smol::Unblock::new(std::io::stdin());
639 let stream = smol::net::unix::UnixStream::connect(&server_paths.stdin_socket).await?;
640 handle_io(stdin, stream, "stdin").await
641 });
642
643 let stdout_task: smol::Task<Result<()>> = smol::spawn(async move {
644 let stdout = smol::Unblock::new(std::io::stdout());
645 let stream = smol::net::unix::UnixStream::connect(&server_paths.stdout_socket).await?;
646 handle_io(stream, stdout, "stdout").await
647 });
648
649 let stderr_task: smol::Task<Result<()>> = smol::spawn(async move {
650 let mut stderr = smol::Unblock::new(std::io::stderr());
651 let mut stream = smol::net::unix::UnixStream::connect(&server_paths.stderr_socket).await?;
652 let mut stderr_buffer = vec![0; 2048];
653 loop {
654 match stream
655 .read(&mut stderr_buffer)
656 .await
657 .context("reading stderr")?
658 {
659 0 => {
660 let error =
661 std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "stderr closed");
662 Err(anyhow!(error))?;
663 }
664 n => {
665 stderr.write_all(&stderr_buffer[..n]).await?;
666 stderr.flush().await?;
667 }
668 }
669 }
670 });
671
672 if let Err(forwarding_result) = smol::block_on(async move {
673 futures::select! {
674 result = stdin_task.fuse() => result.map_err(ExecuteProxyError::StdinTask),
675 result = stdout_task.fuse() => result.map_err(ExecuteProxyError::StdoutTask),
676 result = stderr_task.fuse() => result.map_err(ExecuteProxyError::StderrTask),
677 }
678 }) {
679 log::error!(
680 "encountered error while forwarding messages: {:?}, terminating...",
681 forwarding_result
682 );
683 return Err(forwarding_result);
684 }
685
686 Ok(())
687}
688
689async fn kill_running_server(pid: u32, paths: &ServerPaths) -> Result<(), ExecuteProxyError> {
690 log::info!("killing existing server with PID {}", pid);
691 new_smol_command("kill")
692 .arg(pid.to_string())
693 .output()
694 .await
695 .map_err(|source| ExecuteProxyError::KillRunningServer { source, pid })?;
696
697 for file in [
698 &paths.pid_file,
699 &paths.stdin_socket,
700 &paths.stdout_socket,
701 &paths.stderr_socket,
702 ] {
703 log::debug!("cleaning up file {:?} before starting new server", file);
704 std::fs::remove_file(file).ok();
705 }
706 Ok(())
707}
708
709#[derive(Debug, Error)]
710pub(crate) enum SpawnServerError {
711 #[error("failed to remove stdin socket")]
712 RemoveStdinSocket(#[source] std::io::Error),
713
714 #[error("failed to remove stdout socket")]
715 RemoveStdoutSocket(#[source] std::io::Error),
716
717 #[error("failed to remove stderr socket")]
718 RemoveStderrSocket(#[source] std::io::Error),
719
720 #[error("failed to get current_exe")]
721 CurrentExe(#[source] std::io::Error),
722
723 #[error("failed to launch server process")]
724 ProcessStatus(#[source] std::io::Error),
725
726 #[error("failed to launch and detach server process: {status}\n{paths}")]
727 LaunchStatus { status: ExitStatus, paths: String },
728
729 #[error("failed to wait for server to be ready to accept connections")]
730 Timeout,
731}
732
733async fn spawn_server(paths: &ServerPaths) -> Result<(), SpawnServerError> {
734 log::info!("spawning server process",);
735 if paths.stdin_socket.exists() {
736 std::fs::remove_file(&paths.stdin_socket).map_err(SpawnServerError::RemoveStdinSocket)?;
737 }
738 if paths.stdout_socket.exists() {
739 std::fs::remove_file(&paths.stdout_socket).map_err(SpawnServerError::RemoveStdoutSocket)?;
740 }
741 if paths.stderr_socket.exists() {
742 std::fs::remove_file(&paths.stderr_socket).map_err(SpawnServerError::RemoveStderrSocket)?;
743 }
744
745 let binary_name = std::env::current_exe().map_err(SpawnServerError::CurrentExe)?;
746 let mut server_process = new_smol_command(binary_name);
747 server_process
748 .arg("run")
749 .arg("--log-file")
750 .arg(&paths.log_file)
751 .arg("--pid-file")
752 .arg(&paths.pid_file)
753 .arg("--stdin-socket")
754 .arg(&paths.stdin_socket)
755 .arg("--stdout-socket")
756 .arg(&paths.stdout_socket)
757 .arg("--stderr-socket")
758 .arg(&paths.stderr_socket);
759
760 let status = server_process
761 .status()
762 .await
763 .map_err(SpawnServerError::ProcessStatus)?;
764
765 if !status.success() {
766 return Err(SpawnServerError::LaunchStatus {
767 status,
768 paths: format!(
769 "log file: {:?}, pid file: {:?}",
770 paths.log_file, paths.pid_file,
771 ),
772 });
773 }
774
775 let mut total_time_waited = std::time::Duration::from_secs(0);
776 let wait_duration = std::time::Duration::from_millis(20);
777 while !paths.stdout_socket.exists()
778 || !paths.stdin_socket.exists()
779 || !paths.stderr_socket.exists()
780 {
781 log::debug!("waiting for server to be ready to accept connections...");
782 std::thread::sleep(wait_duration);
783 total_time_waited += wait_duration;
784 if total_time_waited > std::time::Duration::from_secs(10) {
785 return Err(SpawnServerError::Timeout);
786 }
787 }
788
789 log::info!(
790 "server ready to accept connections. total time waited: {:?}",
791 total_time_waited
792 );
793
794 Ok(())
795}
796
797#[derive(Debug, Error)]
798#[error("Failed to remove PID file for missing process (pid `{pid}`")]
799pub(crate) struct CheckPidError {
800 #[source]
801 source: std::io::Error,
802 pid: u32,
803}
804
805async fn check_pid_file(path: &Path) -> Result<Option<u32>, CheckPidError> {
806 let Some(pid) = std::fs::read_to_string(&path)
807 .ok()
808 .and_then(|contents| contents.parse::<u32>().ok())
809 else {
810 return Ok(None);
811 };
812
813 log::debug!("Checking if process with PID {} exists...", pid);
814 match new_smol_command("kill")
815 .arg("-0")
816 .arg(pid.to_string())
817 .output()
818 .await
819 {
820 Ok(output) if output.status.success() => {
821 log::debug!(
822 "Process with PID {} exists. NOT spawning new server, but attaching to existing one.",
823 pid
824 );
825 Ok(Some(pid))
826 }
827 _ => {
828 log::debug!(
829 "Found PID file, but process with that PID does not exist. Removing PID file."
830 );
831 std::fs::remove_file(&path).map_err(|source| CheckPidError { source, pid })?;
832 Ok(None)
833 }
834 }
835}
836
837fn write_pid_file(path: &Path) -> Result<()> {
838 if path.exists() {
839 std::fs::remove_file(path)?;
840 }
841 let pid = std::process::id().to_string();
842 log::debug!("writing PID {} to file {:?}", pid, path);
843 std::fs::write(path, pid).context("Failed to write PID file")
844}
845
846async fn handle_io<R, W>(mut reader: R, mut writer: W, socket_name: &str) -> Result<()>
847where
848 R: AsyncRead + Unpin,
849 W: AsyncWrite + Unpin,
850{
851 use remote::protocol::{read_message_raw, write_size_prefixed_buffer};
852
853 let mut buffer = Vec::new();
854 loop {
855 read_message_raw(&mut reader, &mut buffer)
856 .await
857 .with_context(|| format!("failed to read message from {}", socket_name))?;
858 write_size_prefixed_buffer(&mut writer, &mut buffer)
859 .await
860 .with_context(|| format!("failed to write message to {}", socket_name))?;
861 writer.flush().await?;
862 buffer.clear();
863 }
864}
865
866fn initialize_settings(
867 session: AnyProtoClient,
868 fs: Arc<dyn Fs>,
869 cx: &mut App,
870) -> watch::Receiver<Option<NodeBinaryOptions>> {
871 let user_settings_file_rx =
872 watch_config_file(cx.background_executor(), fs, paths::settings_file().clone());
873
874 handle_settings_file_changes(user_settings_file_rx, cx, {
875 move |err, _cx| {
876 if let Some(e) = err {
877 log::info!("Server settings failed to change: {}", e);
878
879 session
880 .send(proto::Toast {
881 project_id: REMOTE_SERVER_PROJECT_ID,
882 notification_id: "server-settings-failed".to_string(),
883 message: format!(
884 "Error in settings on remote host {:?}: {}",
885 paths::settings_file(),
886 e
887 ),
888 })
889 .log_err();
890 } else {
891 session
892 .send(proto::HideToast {
893 project_id: REMOTE_SERVER_PROJECT_ID,
894 notification_id: "server-settings-failed".to_string(),
895 })
896 .log_err();
897 }
898 }
899 });
900
901 let (mut tx, rx) = watch::channel(None);
902 let mut node_settings = None;
903 cx.observe_global::<SettingsStore>(move |cx| {
904 let new_node_settings = &ProjectSettings::get_global(cx).node;
905 if Some(new_node_settings) != node_settings.as_ref() {
906 log::info!("Got new node settings: {new_node_settings:?}");
907 let options = NodeBinaryOptions {
908 allow_path_lookup: !new_node_settings.ignore_system_version,
909 // TODO: Implement this setting
910 allow_binary_download: true,
911 use_paths: new_node_settings.path.as_ref().map(|node_path| {
912 let node_path = PathBuf::from(shellexpand::tilde(node_path).as_ref());
913 let npm_path = new_node_settings
914 .npm_path
915 .as_ref()
916 .map(|path| PathBuf::from(shellexpand::tilde(&path).as_ref()));
917 (
918 node_path.clone(),
919 npm_path.unwrap_or_else(|| {
920 let base_path = PathBuf::new();
921 node_path.parent().unwrap_or(&base_path).join("npm")
922 }),
923 )
924 }),
925 };
926 node_settings = Some(new_node_settings.clone());
927 tx.send(Some(options)).ok();
928 }
929 })
930 .detach();
931
932 rx
933}
934
935pub fn handle_settings_file_changes(
936 mut server_settings_file: mpsc::UnboundedReceiver<String>,
937 cx: &mut App,
938 settings_changed: impl Fn(Option<anyhow::Error>, &mut App) + 'static,
939) {
940 let server_settings_content = cx
941 .foreground_executor()
942 .block_on(server_settings_file.next())
943 .unwrap();
944 SettingsStore::update_global(cx, |store, cx| {
945 store
946 .set_server_settings(&server_settings_content, cx)
947 .log_err();
948 });
949 cx.spawn(async move |cx| {
950 while let Some(server_settings_content) = server_settings_file.next().await {
951 cx.update_global(|store: &mut SettingsStore, cx| {
952 let result = store.set_server_settings(&server_settings_content, cx);
953 if let Err(err) = &result {
954 log::error!("Failed to load server settings: {err}");
955 }
956 settings_changed(result.err(), cx);
957 cx.refresh_windows();
958 });
959 }
960 })
961 .detach();
962}
963
964fn read_proxy_settings(cx: &mut Context<HeadlessProject>) -> Option<Url> {
965 let proxy_str = ProxySettings::get_global(cx).proxy.to_owned();
966
967 proxy_str
968 .as_deref()
969 .map(str::trim)
970 .filter(|input| !input.is_empty())
971 .and_then(|input| {
972 input
973 .parse::<Url>()
974 .inspect_err(|e| log::error!("Error parsing proxy settings: {}", e))
975 .ok()
976 })
977 .or_else(read_proxy_from_env)
978}
979
980fn daemonize() -> Result<ControlFlow<()>> {
981 match fork::fork().map_err(|e| anyhow!("failed to call fork with error code {e}"))? {
982 fork::Fork::Parent(_) => {
983 return Ok(ControlFlow::Break(()));
984 }
985 fork::Fork::Child => {}
986 }
987
988 // Once we've detached from the parent, we want to close stdout/stderr/stdin
989 // so that the outer SSH process is not attached to us in any way anymore.
990 unsafe { redirect_standard_streams() }?;
991
992 Ok(ControlFlow::Continue(()))
993}
994
995unsafe fn redirect_standard_streams() -> Result<()> {
996 let devnull_fd = unsafe { libc::open(b"/dev/null\0" as *const [u8; 10] as _, libc::O_RDWR) };
997 anyhow::ensure!(devnull_fd != -1, "failed to open /dev/null");
998
999 let process_stdio = |name, fd| {
1000 let reopened_fd = unsafe { libc::dup2(devnull_fd, fd) };
1001 anyhow::ensure!(
1002 reopened_fd != -1,
1003 format!("failed to redirect {} to /dev/null", name)
1004 );
1005 Ok(())
1006 };
1007
1008 process_stdio("stdin", libc::STDIN_FILENO)?;
1009 process_stdio("stdout", libc::STDOUT_FILENO)?;
1010 process_stdio("stderr", libc::STDERR_FILENO)?;
1011
1012 anyhow::ensure!(
1013 unsafe { libc::close(devnull_fd) != -1 },
1014 "failed to close /dev/null fd after redirecting"
1015 );
1016
1017 Ok(())
1018}
1019
1020fn cleanup_old_binaries() -> Result<()> {
1021 let server_dir = paths::remote_server_dir_relative();
1022 let release_channel = release_channel::RELEASE_CHANNEL.dev_name();
1023 let prefix = format!("zed-remote-server-{}-", release_channel);
1024
1025 for entry in std::fs::read_dir(server_dir.as_std_path())? {
1026 let path = entry?.path();
1027
1028 if let Some(file_name) = path.file_name()
1029 && let Some(version) = file_name.to_string_lossy().strip_prefix(&prefix)
1030 && !is_new_version(version)
1031 && !is_file_in_use(file_name)
1032 {
1033 log::info!("removing old remote server binary: {:?}", path);
1034 std::fs::remove_file(&path)?;
1035 }
1036 }
1037
1038 Ok(())
1039}
1040
1041fn is_new_version(version: &str) -> bool {
1042 semver::Version::from_str(version)
1043 .ok()
1044 .zip(semver::Version::from_str(env!("ZED_PKG_VERSION")).ok())
1045 .is_some_and(|(version, current_version)| version >= current_version)
1046}
1047
1048fn is_file_in_use(file_name: &OsStr) -> bool {
1049 let info = sysinfo::System::new_with_specifics(sysinfo::RefreshKind::nothing().with_processes(
1050 sysinfo::ProcessRefreshKind::nothing().with_exe(sysinfo::UpdateKind::Always),
1051 ));
1052
1053 for process in info.processes().values() {
1054 if process
1055 .exe()
1056 .is_some_and(|exe| exe.file_name().is_some_and(|name| name == file_name))
1057 {
1058 return true;
1059 }
1060 }
1061
1062 false
1063}