1use ::proto::{FromProto, ToProto};
2use anyhow::{Context as _, Result, anyhow};
3
4use extension::ExtensionHostProxy;
5use extension_host::headless_host::HeadlessExtensionStore;
6use fs::Fs;
7use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
8use http_client::HttpClient;
9use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
10use node_runtime::NodeRuntime;
11use project::{
12 LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
13 ToolchainStore, WorktreeId,
14 buffer_store::{BufferStore, BufferStoreEvent},
15 debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
16 git_store::GitStore,
17 project_settings::SettingsObserver,
18 search::SearchQuery,
19 task_store::TaskStore,
20 worktree_store::WorktreeStore,
21};
22use rpc::{
23 AnyProtoClient, TypedEnvelope,
24 proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
25};
26
27use settings::initial_server_settings_content;
28use smol::stream::StreamExt;
29use std::{
30 path::{Path, PathBuf},
31 sync::{Arc, atomic::AtomicUsize},
32};
33use util::ResultExt;
34use worktree::Worktree;
35
36pub struct HeadlessProject {
37 pub fs: Arc<dyn Fs>,
38 pub session: AnyProtoClient,
39 pub worktree_store: Entity<WorktreeStore>,
40 pub buffer_store: Entity<BufferStore>,
41 pub lsp_store: Entity<LspStore>,
42 pub task_store: Entity<TaskStore>,
43 pub dap_store: Entity<DapStore>,
44 pub settings_observer: Entity<SettingsObserver>,
45 pub next_entry_id: Arc<AtomicUsize>,
46 pub languages: Arc<LanguageRegistry>,
47 pub extensions: Entity<HeadlessExtensionStore>,
48 pub git_store: Entity<GitStore>,
49}
50
51pub struct HeadlessAppState {
52 pub session: AnyProtoClient,
53 pub fs: Arc<dyn Fs>,
54 pub http_client: Arc<dyn HttpClient>,
55 pub node_runtime: NodeRuntime,
56 pub languages: Arc<LanguageRegistry>,
57 pub extension_host_proxy: Arc<ExtensionHostProxy>,
58}
59
60impl HeadlessProject {
61 pub fn init(cx: &mut App) {
62 settings::init(cx);
63 language::init(cx);
64 project::Project::init_settings(cx);
65 }
66
67 pub fn new(
68 HeadlessAppState {
69 session,
70 fs,
71 http_client,
72 node_runtime,
73 languages,
74 extension_host_proxy: proxy,
75 }: HeadlessAppState,
76 cx: &mut Context<Self>,
77 ) -> Self {
78 debug_adapter_extension::init(proxy.clone(), cx);
79 languages::init(languages.clone(), node_runtime.clone(), cx);
80
81 let worktree_store = cx.new(|cx| {
82 let mut store = WorktreeStore::local(true, fs.clone());
83 store.shared(SSH_PROJECT_ID, session.clone(), cx);
84 store
85 });
86
87 let environment = cx.new(|_| ProjectEnvironment::new(None));
88 let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
89 let toolchain_store = cx.new(|cx| {
90 ToolchainStore::local(
91 languages.clone(),
92 worktree_store.clone(),
93 environment.clone(),
94 manifest_tree.clone(),
95 cx,
96 )
97 });
98
99 let buffer_store = cx.new(|cx| {
100 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
101 buffer_store.shared(SSH_PROJECT_ID, session.clone(), cx);
102 buffer_store
103 });
104
105 let breakpoint_store =
106 cx.new(|_| BreakpointStore::local(worktree_store.clone(), buffer_store.clone()));
107
108 let dap_store = cx.new(|cx| {
109 let mut dap_store = DapStore::new_local(
110 http_client.clone(),
111 node_runtime.clone(),
112 fs.clone(),
113 environment.clone(),
114 toolchain_store.read(cx).as_language_toolchain_store(),
115 worktree_store.clone(),
116 breakpoint_store.clone(),
117 cx,
118 );
119 dap_store.shared(SSH_PROJECT_ID, session.clone(), cx);
120 dap_store
121 });
122
123 let git_store = cx.new(|cx| {
124 let mut store = GitStore::local(
125 &worktree_store,
126 buffer_store.clone(),
127 environment.clone(),
128 fs.clone(),
129 cx,
130 );
131 store.shared(SSH_PROJECT_ID, session.clone(), cx);
132 store
133 });
134
135 let prettier_store = cx.new(|cx| {
136 PrettierStore::new(
137 node_runtime.clone(),
138 fs.clone(),
139 languages.clone(),
140 worktree_store.clone(),
141 cx,
142 )
143 });
144
145 let task_store = cx.new(|cx| {
146 let mut task_store = TaskStore::local(
147 fs.clone(),
148 buffer_store.downgrade(),
149 worktree_store.clone(),
150 toolchain_store.read(cx).as_language_toolchain_store(),
151 environment.clone(),
152 cx,
153 );
154 task_store.shared(SSH_PROJECT_ID, session.clone(), cx);
155 task_store
156 });
157 let settings_observer = cx.new(|cx| {
158 let mut observer = SettingsObserver::new_local(
159 fs.clone(),
160 worktree_store.clone(),
161 task_store.clone(),
162 cx,
163 );
164 observer.shared(SSH_PROJECT_ID, session.clone(), cx);
165 observer
166 });
167
168 let lsp_store = cx.new(|cx| {
169 let mut lsp_store = LspStore::new_local(
170 buffer_store.clone(),
171 worktree_store.clone(),
172 prettier_store.clone(),
173 toolchain_store
174 .read(cx)
175 .as_local_store()
176 .expect("Toolchain store to be local")
177 .clone(),
178 environment,
179 manifest_tree,
180 languages.clone(),
181 http_client.clone(),
182 fs.clone(),
183 cx,
184 );
185 lsp_store.shared(SSH_PROJECT_ID, session.clone(), cx);
186 lsp_store
187 });
188
189 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
190 language_extension::init(
191 language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
192 proxy.clone(),
193 languages.clone(),
194 );
195
196 cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
197 if let BufferStoreEvent::BufferAdded(buffer) = event {
198 cx.subscribe(buffer, Self::on_buffer_event).detach();
199 }
200 })
201 .detach();
202
203 let extensions = HeadlessExtensionStore::new(
204 fs.clone(),
205 http_client.clone(),
206 paths::remote_extensions_dir().to_path_buf(),
207 proxy,
208 node_runtime,
209 cx,
210 );
211
212 // local_machine -> ssh handlers
213 session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
214 session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
215 session.subscribe_to_entity(SSH_PROJECT_ID, &cx.entity());
216 session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
217 session.subscribe_to_entity(SSH_PROJECT_ID, &task_store);
218 session.subscribe_to_entity(SSH_PROJECT_ID, &toolchain_store);
219 session.subscribe_to_entity(SSH_PROJECT_ID, &dap_store);
220 session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
221 session.subscribe_to_entity(SSH_PROJECT_ID, &git_store);
222
223 session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
224 session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
225 session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
226 session.add_request_handler(cx.weak_entity(), Self::handle_ping);
227
228 session.add_entity_request_handler(Self::handle_add_worktree);
229 session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
230
231 session.add_entity_request_handler(Self::handle_open_buffer_by_path);
232 session.add_entity_request_handler(Self::handle_open_new_buffer);
233 session.add_entity_request_handler(Self::handle_find_search_candidates);
234 session.add_entity_request_handler(Self::handle_open_server_settings);
235
236 session.add_entity_request_handler(BufferStore::handle_update_buffer);
237 session.add_entity_message_handler(BufferStore::handle_close_buffer);
238
239 session.add_request_handler(
240 extensions.downgrade(),
241 HeadlessExtensionStore::handle_sync_extensions,
242 );
243 session.add_request_handler(
244 extensions.downgrade(),
245 HeadlessExtensionStore::handle_install_extension,
246 );
247
248 BufferStore::init(&session);
249 WorktreeStore::init(&session);
250 SettingsObserver::init(&session);
251 LspStore::init(&session);
252 TaskStore::init(Some(&session));
253 ToolchainStore::init(&session);
254 DapStore::init(&session, cx);
255 // todo(debugger): Re init breakpoint store when we set it up for collab
256 // BreakpointStore::init(&client);
257 GitStore::init(&session);
258
259 HeadlessProject {
260 next_entry_id: Default::default(),
261 session,
262 settings_observer,
263 fs,
264 worktree_store,
265 buffer_store,
266 lsp_store,
267 task_store,
268 dap_store,
269 languages,
270 extensions,
271 git_store,
272 }
273 }
274
275 fn on_buffer_event(
276 &mut self,
277 buffer: Entity<Buffer>,
278 event: &BufferEvent,
279 cx: &mut Context<Self>,
280 ) {
281 if let BufferEvent::Operation {
282 operation,
283 is_local: true,
284 } = event
285 {
286 cx.background_spawn(self.session.request(proto::UpdateBuffer {
287 project_id: SSH_PROJECT_ID,
288 buffer_id: buffer.read(cx).remote_id().to_proto(),
289 operations: vec![serialize_operation(operation)],
290 }))
291 .detach()
292 }
293 }
294
295 fn on_lsp_store_event(
296 &mut self,
297 _lsp_store: Entity<LspStore>,
298 event: &LspStoreEvent,
299 cx: &mut Context<Self>,
300 ) {
301 match event {
302 LspStoreEvent::LanguageServerUpdate {
303 language_server_id,
304 name,
305 message,
306 } => {
307 self.session
308 .send(proto::UpdateLanguageServer {
309 project_id: SSH_PROJECT_ID,
310 server_name: name.as_ref().map(|name| name.to_string()),
311 language_server_id: language_server_id.to_proto(),
312 variant: Some(message.clone()),
313 })
314 .log_err();
315 }
316 LspStoreEvent::Notification(message) => {
317 self.session
318 .send(proto::Toast {
319 project_id: SSH_PROJECT_ID,
320 notification_id: "lsp".to_string(),
321 message: message.clone(),
322 })
323 .log_err();
324 }
325 LspStoreEvent::LanguageServerLog(language_server_id, log_type, message) => {
326 self.session
327 .send(proto::LanguageServerLog {
328 project_id: SSH_PROJECT_ID,
329 language_server_id: language_server_id.to_proto(),
330 message: message.clone(),
331 log_type: Some(log_type.to_proto()),
332 })
333 .log_err();
334 }
335 LspStoreEvent::LanguageServerPrompt(prompt) => {
336 let request = self.session.request(proto::LanguageServerPromptRequest {
337 project_id: SSH_PROJECT_ID,
338 actions: prompt
339 .actions
340 .iter()
341 .map(|action| action.title.to_string())
342 .collect(),
343 level: Some(prompt_to_proto(prompt)),
344 lsp_name: prompt.lsp_name.clone(),
345 message: prompt.message.clone(),
346 });
347 let prompt = prompt.clone();
348 cx.background_spawn(async move {
349 let response = request.await?;
350 if let Some(action_response) = response.action_response {
351 prompt.respond(action_response as usize).await;
352 }
353 anyhow::Ok(())
354 })
355 .detach();
356 }
357 _ => {}
358 }
359 }
360
361 pub async fn handle_add_worktree(
362 this: Entity<Self>,
363 message: TypedEnvelope<proto::AddWorktree>,
364 mut cx: AsyncApp,
365 ) -> Result<proto::AddWorktreeResponse> {
366 use client::ErrorCodeExt;
367 let fs = this.read_with(&cx, |this, _| this.fs.clone())?;
368 let path = PathBuf::from_proto(shellexpand::tilde(&message.payload.path).to_string());
369
370 let canonicalized = match fs.canonicalize(&path).await {
371 Ok(path) => path,
372 Err(e) => {
373 let mut parent = path
374 .parent()
375 .ok_or(e)
376 .with_context(|| format!("{path:?} does not exist"))?;
377 if parent == Path::new("") {
378 parent = util::paths::home_dir();
379 }
380 let parent = fs.canonicalize(parent).await.map_err(|_| {
381 anyhow!(
382 proto::ErrorCode::DevServerProjectPathDoesNotExist
383 .with_tag("path", path.to_string_lossy().as_ref())
384 )
385 })?;
386 parent.join(path.file_name().unwrap())
387 }
388 };
389
390 let worktree = this
391 .read_with(&cx.clone(), |this, _| {
392 Worktree::local(
393 Arc::from(canonicalized.as_path()),
394 message.payload.visible,
395 this.fs.clone(),
396 this.next_entry_id.clone(),
397 &mut cx,
398 )
399 })?
400 .await?;
401
402 let response = this.read_with(&cx, |_, cx| {
403 let worktree = worktree.read(cx);
404 proto::AddWorktreeResponse {
405 worktree_id: worktree.id().to_proto(),
406 canonicalized_path: canonicalized.to_proto(),
407 }
408 })?;
409
410 // We spawn this asynchronously, so that we can send the response back
411 // *before* `worktree_store.add()` can send out UpdateProject requests
412 // to the client about the new worktree.
413 //
414 // That lets the client manage the reference/handles of the newly-added
415 // worktree, before getting interrupted by an UpdateProject request.
416 //
417 // This fixes the problem of the client sending the AddWorktree request,
418 // headless project sending out a project update, client receiving it
419 // and immediately dropping the reference of the new client, causing it
420 // to be dropped on the headless project, and the client only then
421 // receiving a response to AddWorktree.
422 cx.spawn(async move |cx| {
423 this.update(cx, |this, cx| {
424 this.worktree_store.update(cx, |worktree_store, cx| {
425 worktree_store.add(&worktree, cx);
426 });
427 })
428 .log_err();
429 })
430 .detach();
431
432 Ok(response)
433 }
434
435 pub async fn handle_remove_worktree(
436 this: Entity<Self>,
437 envelope: TypedEnvelope<proto::RemoveWorktree>,
438 mut cx: AsyncApp,
439 ) -> Result<proto::Ack> {
440 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
441 this.update(&mut cx, |this, cx| {
442 this.worktree_store.update(cx, |worktree_store, cx| {
443 worktree_store.remove_worktree(worktree_id, cx);
444 });
445 })?;
446 Ok(proto::Ack {})
447 }
448
449 pub async fn handle_open_buffer_by_path(
450 this: Entity<Self>,
451 message: TypedEnvelope<proto::OpenBufferByPath>,
452 mut cx: AsyncApp,
453 ) -> Result<proto::OpenBufferResponse> {
454 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
455 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
456 let buffer_store = this.buffer_store.clone();
457 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
458 buffer_store.open_buffer(
459 ProjectPath {
460 worktree_id,
461 path: Arc::<Path>::from_proto(message.payload.path),
462 },
463 cx,
464 )
465 });
466 anyhow::Ok((buffer_store, buffer))
467 })??;
468
469 let buffer = buffer.await?;
470 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
471 buffer_store.update(&mut cx, |buffer_store, cx| {
472 buffer_store
473 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
474 .detach_and_log_err(cx);
475 })?;
476
477 Ok(proto::OpenBufferResponse {
478 buffer_id: buffer_id.to_proto(),
479 })
480 }
481
482 pub async fn handle_open_new_buffer(
483 this: Entity<Self>,
484 _message: TypedEnvelope<proto::OpenNewBuffer>,
485 mut cx: AsyncApp,
486 ) -> Result<proto::OpenBufferResponse> {
487 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
488 let buffer_store = this.buffer_store.clone();
489 let buffer = this
490 .buffer_store
491 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx));
492 anyhow::Ok((buffer_store, buffer))
493 })??;
494
495 let buffer = buffer.await?;
496 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
497 buffer_store.update(&mut cx, |buffer_store, cx| {
498 buffer_store
499 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
500 .detach_and_log_err(cx);
501 })?;
502
503 Ok(proto::OpenBufferResponse {
504 buffer_id: buffer_id.to_proto(),
505 })
506 }
507
508 pub async fn handle_open_server_settings(
509 this: Entity<Self>,
510 _: TypedEnvelope<proto::OpenServerSettings>,
511 mut cx: AsyncApp,
512 ) -> Result<proto::OpenBufferResponse> {
513 let settings_path = paths::settings_file();
514 let (worktree, path) = this
515 .update(&mut cx, |this, cx| {
516 this.worktree_store.update(cx, |worktree_store, cx| {
517 worktree_store.find_or_create_worktree(settings_path, false, cx)
518 })
519 })?
520 .await?;
521
522 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
523 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
524 buffer_store.open_buffer(
525 ProjectPath {
526 worktree_id: worktree.read(cx).id(),
527 path: path.into(),
528 },
529 cx,
530 )
531 });
532
533 (buffer, this.buffer_store.clone())
534 })?;
535
536 let buffer = buffer.await?;
537
538 let buffer_id = cx.update(|cx| {
539 if buffer.read(cx).is_empty() {
540 buffer.update(cx, |buffer, cx| {
541 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
542 });
543 }
544
545 let buffer_id = buffer.read(cx).remote_id();
546
547 buffer_store.update(cx, |buffer_store, cx| {
548 buffer_store
549 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
550 .detach_and_log_err(cx);
551 });
552
553 buffer_id
554 })?;
555
556 Ok(proto::OpenBufferResponse {
557 buffer_id: buffer_id.to_proto(),
558 })
559 }
560
561 pub async fn handle_find_search_candidates(
562 this: Entity<Self>,
563 envelope: TypedEnvelope<proto::FindSearchCandidates>,
564 mut cx: AsyncApp,
565 ) -> Result<proto::FindSearchCandidatesResponse> {
566 let message = envelope.payload;
567 let query = SearchQuery::from_proto(message.query.context("missing query field")?)?;
568 let results = this.update(&mut cx, |this, cx| {
569 this.buffer_store.update(cx, |buffer_store, cx| {
570 buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
571 })
572 })?;
573
574 let mut response = proto::FindSearchCandidatesResponse {
575 buffer_ids: Vec::new(),
576 };
577
578 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
579
580 while let Ok(buffer) = results.recv().await {
581 let buffer_id = buffer.read_with(&cx, |this, _| this.remote_id())?;
582 response.buffer_ids.push(buffer_id.to_proto());
583 buffer_store
584 .update(&mut cx, |buffer_store, cx| {
585 buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
586 })?
587 .await?;
588 }
589
590 Ok(response)
591 }
592
593 pub async fn handle_list_remote_directory(
594 this: Entity<Self>,
595 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
596 cx: AsyncApp,
597 ) -> Result<proto::ListRemoteDirectoryResponse> {
598 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
599 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
600 let check_info = envelope
601 .payload
602 .config
603 .as_ref()
604 .is_some_and(|config| config.is_dir);
605
606 let mut entries = Vec::new();
607 let mut entry_info = Vec::new();
608 let mut response = fs.read_dir(&expanded).await?;
609 while let Some(path) = response.next().await {
610 let path = path?;
611 if let Some(file_name) = path.file_name() {
612 entries.push(file_name.to_string_lossy().to_string());
613 if check_info {
614 let is_dir = fs.is_dir(&path).await;
615 entry_info.push(proto::EntryInfo { is_dir });
616 }
617 }
618 }
619 Ok(proto::ListRemoteDirectoryResponse {
620 entries,
621 entry_info,
622 })
623 }
624
625 pub async fn handle_get_path_metadata(
626 this: Entity<Self>,
627 envelope: TypedEnvelope<proto::GetPathMetadata>,
628 cx: AsyncApp,
629 ) -> Result<proto::GetPathMetadataResponse> {
630 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
631 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
632
633 let metadata = fs.metadata(&expanded).await?;
634 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
635
636 Ok(proto::GetPathMetadataResponse {
637 exists: metadata.is_some(),
638 is_dir,
639 path: expanded.to_proto(),
640 })
641 }
642
643 pub async fn handle_shutdown_remote_server(
644 _this: Entity<Self>,
645 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
646 cx: AsyncApp,
647 ) -> Result<proto::Ack> {
648 cx.spawn(async move |cx| {
649 cx.update(|cx| {
650 // TODO: This is a hack, because in a headless project, shutdown isn't executed
651 // when calling quit, but it should be.
652 cx.shutdown();
653 cx.quit();
654 })
655 })
656 .detach();
657
658 Ok(proto::Ack {})
659 }
660
661 pub async fn handle_ping(
662 _this: Entity<Self>,
663 _envelope: TypedEnvelope<proto::Ping>,
664 _cx: AsyncApp,
665 ) -> Result<proto::Ack> {
666 log::debug!("Received ping from client");
667 Ok(proto::Ack {})
668 }
669}
670
671fn prompt_to_proto(
672 prompt: &project::LanguageServerPromptRequest,
673) -> proto::language_server_prompt_request::Level {
674 match prompt.level {
675 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
676 proto::language_server_prompt_request::Info {},
677 ),
678 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
679 proto::language_server_prompt_request::Warning {},
680 ),
681 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
682 proto::language_server_prompt_request::Critical {},
683 ),
684 }
685}