1use anyhow::{Context as _, Result, anyhow};
2use language::File;
3use lsp::LanguageServerId;
4
5use extension::ExtensionHostProxy;
6use extension_host::headless_host::HeadlessExtensionStore;
7use fs::Fs;
8use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
9use http_client::HttpClient;
10use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
11use node_runtime::NodeRuntime;
12use project::{
13 LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
14 ToolchainStore, WorktreeId,
15 agent_server_store::AgentServerStore,
16 buffer_store::{BufferStore, BufferStoreEvent},
17 debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
18 git_store::GitStore,
19 image_store::ImageId,
20 lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind},
21 project_settings::SettingsObserver,
22 search::SearchQuery,
23 task_store::TaskStore,
24 worktree_store::WorktreeStore,
25};
26use rpc::{
27 AnyProtoClient, TypedEnvelope,
28 proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
29};
30
31use settings::{Settings as _, initial_server_settings_content};
32use smol::stream::StreamExt;
33use std::{
34 num::NonZeroU64,
35 path::{Path, PathBuf},
36 sync::{
37 Arc,
38 atomic::{AtomicU64, AtomicUsize, Ordering},
39 },
40};
41use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
42use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
43use worktree::Worktree;
44
45pub struct HeadlessProject {
46 pub fs: Arc<dyn Fs>,
47 pub session: AnyProtoClient,
48 pub worktree_store: Entity<WorktreeStore>,
49 pub buffer_store: Entity<BufferStore>,
50 pub lsp_store: Entity<LspStore>,
51 pub task_store: Entity<TaskStore>,
52 pub dap_store: Entity<DapStore>,
53 pub agent_server_store: Entity<AgentServerStore>,
54 pub settings_observer: Entity<SettingsObserver>,
55 pub next_entry_id: Arc<AtomicUsize>,
56 pub languages: Arc<LanguageRegistry>,
57 pub extensions: Entity<HeadlessExtensionStore>,
58 pub git_store: Entity<GitStore>,
59 pub environment: Entity<ProjectEnvironment>,
60 // Used mostly to keep alive the toolchain store for RPC handlers.
61 // Local variant is used within LSP store, but that's a separate entity.
62 pub _toolchain_store: Entity<ToolchainStore>,
63}
64
65pub struct HeadlessAppState {
66 pub session: AnyProtoClient,
67 pub fs: Arc<dyn Fs>,
68 pub http_client: Arc<dyn HttpClient>,
69 pub node_runtime: NodeRuntime,
70 pub languages: Arc<LanguageRegistry>,
71 pub extension_host_proxy: Arc<ExtensionHostProxy>,
72}
73
74impl HeadlessProject {
75 pub fn init(cx: &mut App) {
76 settings::init(cx);
77 language::init(cx);
78 project::Project::init_settings(cx);
79 extension_host::ExtensionSettings::register(cx);
80 log_store::init(true, cx);
81 }
82
83 pub fn new(
84 HeadlessAppState {
85 session,
86 fs,
87 http_client,
88 node_runtime,
89 languages,
90 extension_host_proxy: proxy,
91 }: HeadlessAppState,
92 cx: &mut Context<Self>,
93 ) -> Self {
94 debug_adapter_extension::init(proxy.clone(), cx);
95 languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
96
97 let worktree_store = cx.new(|cx| {
98 let mut store = WorktreeStore::local(true, fs.clone());
99 store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
100 store
101 });
102
103 let environment =
104 cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
105 let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
106 let toolchain_store = cx.new(|cx| {
107 ToolchainStore::local(
108 languages.clone(),
109 worktree_store.clone(),
110 environment.clone(),
111 manifest_tree.clone(),
112 fs.clone(),
113 cx,
114 )
115 });
116
117 let buffer_store = cx.new(|cx| {
118 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
119 buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
120 buffer_store
121 });
122
123 let breakpoint_store =
124 cx.new(|_| BreakpointStore::local(worktree_store.clone(), buffer_store.clone()));
125
126 let dap_store = cx.new(|cx| {
127 let mut dap_store = DapStore::new_local(
128 http_client.clone(),
129 node_runtime.clone(),
130 fs.clone(),
131 environment.clone(),
132 toolchain_store.read(cx).as_language_toolchain_store(),
133 worktree_store.clone(),
134 breakpoint_store.clone(),
135 true,
136 cx,
137 );
138 dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
139 dap_store
140 });
141
142 let git_store = cx.new(|cx| {
143 let mut store = GitStore::local(
144 &worktree_store,
145 buffer_store.clone(),
146 environment.clone(),
147 fs.clone(),
148 cx,
149 );
150 store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
151 store
152 });
153
154 let prettier_store = cx.new(|cx| {
155 PrettierStore::new(
156 node_runtime.clone(),
157 fs.clone(),
158 languages.clone(),
159 worktree_store.clone(),
160 cx,
161 )
162 });
163
164 let task_store = cx.new(|cx| {
165 let mut task_store = TaskStore::local(
166 buffer_store.downgrade(),
167 worktree_store.clone(),
168 toolchain_store.read(cx).as_language_toolchain_store(),
169 environment.clone(),
170 cx,
171 );
172 task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
173 task_store
174 });
175 let settings_observer = cx.new(|cx| {
176 let mut observer = SettingsObserver::new_local(
177 fs.clone(),
178 worktree_store.clone(),
179 task_store.clone(),
180 cx,
181 );
182 observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
183 observer
184 });
185
186 let lsp_store = cx.new(|cx| {
187 let mut lsp_store = LspStore::new_local(
188 buffer_store.clone(),
189 worktree_store.clone(),
190 prettier_store.clone(),
191 toolchain_store
192 .read(cx)
193 .as_local_store()
194 .expect("Toolchain store to be local")
195 .clone(),
196 environment.clone(),
197 manifest_tree,
198 languages.clone(),
199 http_client.clone(),
200 fs.clone(),
201 cx,
202 );
203 lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
204 lsp_store
205 });
206
207 let agent_server_store = cx.new(|cx| {
208 let mut agent_server_store = AgentServerStore::local(
209 node_runtime.clone(),
210 fs.clone(),
211 environment.clone(),
212 http_client.clone(),
213 cx,
214 );
215 agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
216 agent_server_store
217 });
218
219 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
220 language_extension::init(
221 language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
222 proxy.clone(),
223 languages.clone(),
224 );
225
226 cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
227 if let BufferStoreEvent::BufferAdded(buffer) = event {
228 cx.subscribe(buffer, Self::on_buffer_event).detach();
229 }
230 })
231 .detach();
232
233 let extensions = HeadlessExtensionStore::new(
234 fs.clone(),
235 http_client.clone(),
236 paths::remote_extensions_dir().to_path_buf(),
237 proxy,
238 node_runtime,
239 cx,
240 );
241
242 // local_machine -> ssh handlers
243 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
244 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
245 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
246 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
247 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
248 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
249 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
250 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
251 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
252 session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
253
254 session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
255 session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
256 session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
257 session.add_request_handler(cx.weak_entity(), Self::handle_ping);
258 session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
259
260 session.add_entity_request_handler(Self::handle_add_worktree);
261 session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
262
263 session.add_entity_request_handler(Self::handle_open_buffer_by_path);
264 session.add_entity_request_handler(Self::handle_open_new_buffer);
265 session.add_entity_request_handler(Self::handle_find_search_candidates);
266 session.add_entity_request_handler(Self::handle_open_server_settings);
267 session.add_entity_request_handler(Self::handle_get_directory_environment);
268 session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
269 session.add_entity_request_handler(Self::handle_open_image_by_path);
270
271 session.add_entity_request_handler(BufferStore::handle_update_buffer);
272 session.add_entity_message_handler(BufferStore::handle_close_buffer);
273
274 session.add_request_handler(
275 extensions.downgrade(),
276 HeadlessExtensionStore::handle_sync_extensions,
277 );
278 session.add_request_handler(
279 extensions.downgrade(),
280 HeadlessExtensionStore::handle_install_extension,
281 );
282
283 BufferStore::init(&session);
284 WorktreeStore::init(&session);
285 SettingsObserver::init(&session);
286 LspStore::init(&session);
287 TaskStore::init(Some(&session));
288 ToolchainStore::init(&session);
289 DapStore::init(&session, cx);
290 // todo(debugger): Re init breakpoint store when we set it up for collab
291 // BreakpointStore::init(&client);
292 GitStore::init(&session);
293 AgentServerStore::init_headless(&session);
294
295 HeadlessProject {
296 next_entry_id: Default::default(),
297 session,
298 settings_observer,
299 fs,
300 worktree_store,
301 buffer_store,
302 lsp_store,
303 task_store,
304 dap_store,
305 agent_server_store,
306 languages,
307 extensions,
308 git_store,
309 environment,
310 _toolchain_store: toolchain_store,
311 }
312 }
313
314 fn on_buffer_event(
315 &mut self,
316 buffer: Entity<Buffer>,
317 event: &BufferEvent,
318 cx: &mut Context<Self>,
319 ) {
320 if let BufferEvent::Operation {
321 operation,
322 is_local: true,
323 } = event
324 {
325 cx.background_spawn(self.session.request(proto::UpdateBuffer {
326 project_id: REMOTE_SERVER_PROJECT_ID,
327 buffer_id: buffer.read(cx).remote_id().to_proto(),
328 operations: vec![serialize_operation(operation)],
329 }))
330 .detach()
331 }
332 }
333
334 fn on_lsp_store_event(
335 &mut self,
336 lsp_store: Entity<LspStore>,
337 event: &LspStoreEvent,
338 cx: &mut Context<Self>,
339 ) {
340 match event {
341 LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
342 let log_store = cx
343 .try_global::<GlobalLogStore>()
344 .map(|lsp_logs| lsp_logs.0.clone());
345 if let Some(log_store) = log_store {
346 log_store.update(cx, |log_store, cx| {
347 log_store.add_language_server(
348 LanguageServerKind::LocalSsh {
349 lsp_store: self.lsp_store.downgrade(),
350 },
351 *id,
352 Some(name.clone()),
353 *worktree_id,
354 lsp_store.read(cx).language_server_for_id(*id),
355 cx,
356 );
357 });
358 }
359 }
360 LspStoreEvent::LanguageServerRemoved(id) => {
361 let log_store = cx
362 .try_global::<GlobalLogStore>()
363 .map(|lsp_logs| lsp_logs.0.clone());
364 if let Some(log_store) = log_store {
365 log_store.update(cx, |log_store, cx| {
366 log_store.remove_language_server(*id, cx);
367 });
368 }
369 }
370 LspStoreEvent::LanguageServerUpdate {
371 language_server_id,
372 name,
373 message,
374 } => {
375 self.session
376 .send(proto::UpdateLanguageServer {
377 project_id: REMOTE_SERVER_PROJECT_ID,
378 server_name: name.as_ref().map(|name| name.to_string()),
379 language_server_id: language_server_id.to_proto(),
380 variant: Some(message.clone()),
381 })
382 .log_err();
383 }
384 LspStoreEvent::Notification(message) => {
385 self.session
386 .send(proto::Toast {
387 project_id: REMOTE_SERVER_PROJECT_ID,
388 notification_id: "lsp".to_string(),
389 message: message.clone(),
390 })
391 .log_err();
392 }
393 LspStoreEvent::LanguageServerPrompt(prompt) => {
394 let request = self.session.request(proto::LanguageServerPromptRequest {
395 project_id: REMOTE_SERVER_PROJECT_ID,
396 actions: prompt
397 .actions
398 .iter()
399 .map(|action| action.title.to_string())
400 .collect(),
401 level: Some(prompt_to_proto(prompt)),
402 lsp_name: prompt.lsp_name.clone(),
403 message: prompt.message.clone(),
404 });
405 let prompt = prompt.clone();
406 cx.background_spawn(async move {
407 let response = request.await?;
408 if let Some(action_response) = response.action_response {
409 prompt.respond(action_response as usize).await;
410 }
411 anyhow::Ok(())
412 })
413 .detach();
414 }
415 _ => {}
416 }
417 }
418
419 pub async fn handle_add_worktree(
420 this: Entity<Self>,
421 message: TypedEnvelope<proto::AddWorktree>,
422 mut cx: AsyncApp,
423 ) -> Result<proto::AddWorktreeResponse> {
424 use client::ErrorCodeExt;
425 let fs = this.read_with(&cx, |this, _| this.fs.clone())?;
426 let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
427
428 let canonicalized = match fs.canonicalize(&path).await {
429 Ok(path) => path,
430 Err(e) => {
431 let mut parent = path
432 .parent()
433 .ok_or(e)
434 .with_context(|| format!("{path:?} does not exist"))?;
435 if parent == Path::new("") {
436 parent = util::paths::home_dir();
437 }
438 let parent = fs.canonicalize(parent).await.map_err(|_| {
439 anyhow!(
440 proto::ErrorCode::DevServerProjectPathDoesNotExist
441 .with_tag("path", path.to_string_lossy().as_ref())
442 )
443 })?;
444 parent.join(path.file_name().unwrap())
445 }
446 };
447
448 let worktree = this
449 .read_with(&cx.clone(), |this, _| {
450 Worktree::local(
451 Arc::from(canonicalized.as_path()),
452 message.payload.visible,
453 this.fs.clone(),
454 this.next_entry_id.clone(),
455 &mut cx,
456 )
457 })?
458 .await?;
459
460 let response = this.read_with(&cx, |_, cx| {
461 let worktree = worktree.read(cx);
462 proto::AddWorktreeResponse {
463 worktree_id: worktree.id().to_proto(),
464 canonicalized_path: canonicalized.to_string_lossy().into_owned(),
465 }
466 })?;
467
468 // We spawn this asynchronously, so that we can send the response back
469 // *before* `worktree_store.add()` can send out UpdateProject requests
470 // to the client about the new worktree.
471 //
472 // That lets the client manage the reference/handles of the newly-added
473 // worktree, before getting interrupted by an UpdateProject request.
474 //
475 // This fixes the problem of the client sending the AddWorktree request,
476 // headless project sending out a project update, client receiving it
477 // and immediately dropping the reference of the new client, causing it
478 // to be dropped on the headless project, and the client only then
479 // receiving a response to AddWorktree.
480 cx.spawn(async move |cx| {
481 this.update(cx, |this, cx| {
482 this.worktree_store.update(cx, |worktree_store, cx| {
483 worktree_store.add(&worktree, cx);
484 });
485 })
486 .log_err();
487 })
488 .detach();
489
490 Ok(response)
491 }
492
493 pub async fn handle_remove_worktree(
494 this: Entity<Self>,
495 envelope: TypedEnvelope<proto::RemoveWorktree>,
496 mut cx: AsyncApp,
497 ) -> Result<proto::Ack> {
498 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
499 this.update(&mut cx, |this, cx| {
500 this.worktree_store.update(cx, |worktree_store, cx| {
501 worktree_store.remove_worktree(worktree_id, cx);
502 });
503 })?;
504 Ok(proto::Ack {})
505 }
506
507 pub async fn handle_open_buffer_by_path(
508 this: Entity<Self>,
509 message: TypedEnvelope<proto::OpenBufferByPath>,
510 mut cx: AsyncApp,
511 ) -> Result<proto::OpenBufferResponse> {
512 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
513 let path = RelPath::from_proto(&message.payload.path)?;
514 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
515 let buffer_store = this.buffer_store.clone();
516 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
517 buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
518 });
519 anyhow::Ok((buffer_store, buffer))
520 })??;
521
522 let buffer = buffer.await?;
523 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
524 buffer_store.update(&mut cx, |buffer_store, cx| {
525 buffer_store
526 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
527 .detach_and_log_err(cx);
528 })?;
529
530 Ok(proto::OpenBufferResponse {
531 buffer_id: buffer_id.to_proto(),
532 })
533 }
534
535 pub async fn handle_open_image_by_path(
536 this: Entity<Self>,
537 message: TypedEnvelope<proto::OpenImageByPath>,
538 mut cx: AsyncApp,
539 ) -> Result<proto::OpenImageResponse> {
540 static NEXT_ID: AtomicU64 = AtomicU64::new(1);
541 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
542 let path = RelPath::from_proto(&message.payload.path)?;
543 let project_id = message.payload.project_id;
544 use proto::create_image_for_peer::Variant;
545
546 let (worktree_store, session) = this.read_with(&cx, |this, _| {
547 (this.worktree_store.clone(), this.session.clone())
548 })?;
549
550 let worktree = worktree_store
551 .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))?
552 .context("worktree not found")?;
553
554 let load_task = worktree.update(&mut cx, |worktree, cx| {
555 worktree.load_binary_file(path.as_ref(), cx)
556 })?;
557
558 let loaded_file = load_task.await?;
559 let content = loaded_file.content;
560 let file = loaded_file.file;
561
562 let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx))?;
563 let image_id =
564 ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
565
566 let format = image::guess_format(&content)
567 .map(|f| format!("{:?}", f).to_lowercase())
568 .unwrap_or_else(|_| "unknown".to_string());
569
570 let state = proto::ImageState {
571 id: image_id.to_proto(),
572 file: Some(proto_file),
573 content_size: content.len() as u64,
574 format,
575 };
576
577 session.send(proto::CreateImageForPeer {
578 project_id,
579 peer_id: Some(REMOTE_SERVER_PEER_ID),
580 variant: Some(Variant::State(state)),
581 })?;
582
583 const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
584 for chunk in content.chunks(CHUNK_SIZE) {
585 session.send(proto::CreateImageForPeer {
586 project_id,
587 peer_id: Some(REMOTE_SERVER_PEER_ID),
588 variant: Some(Variant::Chunk(proto::ImageChunk {
589 image_id: image_id.to_proto(),
590 data: chunk.to_vec(),
591 })),
592 })?;
593 }
594
595 Ok(proto::OpenImageResponse {
596 image_id: image_id.to_proto(),
597 })
598 }
599
600 pub async fn handle_open_new_buffer(
601 this: Entity<Self>,
602 _message: TypedEnvelope<proto::OpenNewBuffer>,
603 mut cx: AsyncApp,
604 ) -> Result<proto::OpenBufferResponse> {
605 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
606 let buffer_store = this.buffer_store.clone();
607 let buffer = this
608 .buffer_store
609 .update(cx, |buffer_store, cx| buffer_store.create_buffer(true, cx));
610 anyhow::Ok((buffer_store, buffer))
611 })??;
612
613 let buffer = buffer.await?;
614 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
615 buffer_store.update(&mut cx, |buffer_store, cx| {
616 buffer_store
617 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
618 .detach_and_log_err(cx);
619 })?;
620
621 Ok(proto::OpenBufferResponse {
622 buffer_id: buffer_id.to_proto(),
623 })
624 }
625
626 async fn handle_toggle_lsp_logs(
627 _: Entity<Self>,
628 envelope: TypedEnvelope<proto::ToggleLspLogs>,
629 mut cx: AsyncApp,
630 ) -> Result<()> {
631 let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
632 let lsp_logs = cx
633 .update(|cx| {
634 cx.try_global::<GlobalLogStore>()
635 .map(|lsp_logs| lsp_logs.0.clone())
636 })?
637 .context("lsp logs store is missing")?;
638
639 lsp_logs.update(&mut cx, |lsp_logs, _| {
640 // RPC logs are very noisy and we need to toggle it on the headless server too.
641 // The rest of the logs for the ssh project are very important to have toggled always,
642 // to e.g. send language server error logs to the client before anything is toggled.
643 if envelope.payload.enabled {
644 lsp_logs.enable_rpc_trace_for_language_server(server_id);
645 } else {
646 lsp_logs.disable_rpc_trace_for_language_server(server_id);
647 }
648 })?;
649 Ok(())
650 }
651
652 async fn handle_open_server_settings(
653 this: Entity<Self>,
654 _: TypedEnvelope<proto::OpenServerSettings>,
655 mut cx: AsyncApp,
656 ) -> Result<proto::OpenBufferResponse> {
657 let settings_path = paths::settings_file();
658 let (worktree, path) = this
659 .update(&mut cx, |this, cx| {
660 this.worktree_store.update(cx, |worktree_store, cx| {
661 worktree_store.find_or_create_worktree(settings_path, false, cx)
662 })
663 })?
664 .await?;
665
666 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
667 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
668 buffer_store.open_buffer(
669 ProjectPath {
670 worktree_id: worktree.read(cx).id(),
671 path: path,
672 },
673 cx,
674 )
675 });
676
677 (buffer, this.buffer_store.clone())
678 })?;
679
680 let buffer = buffer.await?;
681
682 let buffer_id = cx.update(|cx| {
683 if buffer.read(cx).is_empty() {
684 buffer.update(cx, |buffer, cx| {
685 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
686 });
687 }
688
689 let buffer_id = buffer.read(cx).remote_id();
690
691 buffer_store.update(cx, |buffer_store, cx| {
692 buffer_store
693 .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
694 .detach_and_log_err(cx);
695 });
696
697 buffer_id
698 })?;
699
700 Ok(proto::OpenBufferResponse {
701 buffer_id: buffer_id.to_proto(),
702 })
703 }
704
705 async fn handle_find_search_candidates(
706 this: Entity<Self>,
707 envelope: TypedEnvelope<proto::FindSearchCandidates>,
708 mut cx: AsyncApp,
709 ) -> Result<proto::FindSearchCandidatesResponse> {
710 let message = envelope.payload;
711 let query = SearchQuery::from_proto(
712 message.query.context("missing query field")?,
713 PathStyle::local(),
714 )?;
715 let results = this.update(&mut cx, |this, cx| {
716 this.buffer_store.update(cx, |buffer_store, cx| {
717 buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
718 })
719 })?;
720
721 let mut response = proto::FindSearchCandidatesResponse {
722 buffer_ids: Vec::new(),
723 };
724
725 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
726
727 while let Ok(buffer) = results.recv().await {
728 let buffer_id = buffer.read_with(&cx, |this, _| this.remote_id())?;
729 response.buffer_ids.push(buffer_id.to_proto());
730 buffer_store
731 .update(&mut cx, |buffer_store, cx| {
732 buffer_store.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
733 })?
734 .await?;
735 }
736
737 Ok(response)
738 }
739
740 async fn handle_list_remote_directory(
741 this: Entity<Self>,
742 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
743 cx: AsyncApp,
744 ) -> Result<proto::ListRemoteDirectoryResponse> {
745 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
746 let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
747 let check_info = envelope
748 .payload
749 .config
750 .as_ref()
751 .is_some_and(|config| config.is_dir);
752
753 let mut entries = Vec::new();
754 let mut entry_info = Vec::new();
755 let mut response = fs.read_dir(&expanded).await?;
756 while let Some(path) = response.next().await {
757 let path = path?;
758 if let Some(file_name) = path.file_name() {
759 entries.push(file_name.to_string_lossy().into_owned());
760 if check_info {
761 let is_dir = fs.is_dir(&path).await;
762 entry_info.push(proto::EntryInfo { is_dir });
763 }
764 }
765 }
766 Ok(proto::ListRemoteDirectoryResponse {
767 entries,
768 entry_info,
769 })
770 }
771
772 async fn handle_get_path_metadata(
773 this: Entity<Self>,
774 envelope: TypedEnvelope<proto::GetPathMetadata>,
775 cx: AsyncApp,
776 ) -> Result<proto::GetPathMetadataResponse> {
777 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
778 let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
779
780 let metadata = fs.metadata(&expanded).await?;
781 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
782
783 Ok(proto::GetPathMetadataResponse {
784 exists: metadata.is_some(),
785 is_dir,
786 path: expanded.to_string_lossy().into_owned(),
787 })
788 }
789
790 async fn handle_shutdown_remote_server(
791 _this: Entity<Self>,
792 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
793 cx: AsyncApp,
794 ) -> Result<proto::Ack> {
795 cx.spawn(async move |cx| {
796 cx.update(|cx| {
797 // TODO: This is a hack, because in a headless project, shutdown isn't executed
798 // when calling quit, but it should be.
799 cx.shutdown();
800 cx.quit();
801 })
802 })
803 .detach();
804
805 Ok(proto::Ack {})
806 }
807
808 pub async fn handle_ping(
809 _this: Entity<Self>,
810 _envelope: TypedEnvelope<proto::Ping>,
811 _cx: AsyncApp,
812 ) -> Result<proto::Ack> {
813 log::debug!("Received ping from client");
814 Ok(proto::Ack {})
815 }
816
817 async fn handle_get_processes(
818 _this: Entity<Self>,
819 _envelope: TypedEnvelope<proto::GetProcesses>,
820 _cx: AsyncApp,
821 ) -> Result<proto::GetProcessesResponse> {
822 let mut processes = Vec::new();
823 let refresh_kind = RefreshKind::nothing().with_processes(
824 ProcessRefreshKind::nothing()
825 .without_tasks()
826 .with_cmd(UpdateKind::Always),
827 );
828
829 for process in System::new_with_specifics(refresh_kind)
830 .processes()
831 .values()
832 {
833 let name = process.name().to_string_lossy().into_owned();
834 let command = process
835 .cmd()
836 .iter()
837 .map(|s| s.to_string_lossy().into_owned())
838 .collect::<Vec<_>>();
839
840 processes.push(proto::ProcessInfo {
841 pid: process.pid().as_u32(),
842 name,
843 command,
844 });
845 }
846
847 processes.sort_by_key(|p| p.name.clone());
848
849 Ok(proto::GetProcessesResponse { processes })
850 }
851
852 async fn handle_get_directory_environment(
853 this: Entity<Self>,
854 envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
855 mut cx: AsyncApp,
856 ) -> Result<proto::DirectoryEnvironment> {
857 let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
858 let directory = PathBuf::from(envelope.payload.directory);
859 let environment = this
860 .update(&mut cx, |this, cx| {
861 this.environment.update(cx, |environment, cx| {
862 environment.local_directory_environment(&shell, directory.into(), cx)
863 })
864 })?
865 .await
866 .context("failed to get directory environment")?
867 .into_iter()
868 .collect();
869 Ok(proto::DirectoryEnvironment { environment })
870 }
871}
872
873fn prompt_to_proto(
874 prompt: &project::LanguageServerPromptRequest,
875) -> proto::language_server_prompt_request::Level {
876 match prompt.level {
877 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
878 proto::language_server_prompt_request::Info {},
879 ),
880 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
881 proto::language_server_prompt_request::Warning {},
882 ),
883 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
884 proto::language_server_prompt_request::Critical {},
885 ),
886 }
887}