1use ::proto::{FromProto, ToProto};
2use anyhow::{Context as _, Result, anyhow};
3
4use extension::ExtensionHostProxy;
5use extension_host::headless_host::HeadlessExtensionStore;
6use fs::Fs;
7use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
8use http_client::HttpClient;
9use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
10use node_runtime::NodeRuntime;
11use project::{
12 LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
13 ToolchainStore, WorktreeId,
14 buffer_store::{BufferStore, BufferStoreEvent},
15 debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
16 git_store::GitStore,
17 project_settings::SettingsObserver,
18 search::SearchQuery,
19 task_store::TaskStore,
20 worktree_store::WorktreeStore,
21};
22use remote::ssh_session::ChannelClient;
23use rpc::{
24 AnyProtoClient, TypedEnvelope,
25 proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
26};
27
28use settings::initial_server_settings_content;
29use smol::stream::StreamExt;
30use std::{
31 path::{Path, PathBuf},
32 sync::{Arc, atomic::AtomicUsize},
33};
34use util::ResultExt;
35use worktree::Worktree;
36
37pub struct HeadlessProject {
38 pub fs: Arc<dyn Fs>,
39 pub session: AnyProtoClient,
40 pub worktree_store: Entity<WorktreeStore>,
41 pub buffer_store: Entity<BufferStore>,
42 pub lsp_store: Entity<LspStore>,
43 pub task_store: Entity<TaskStore>,
44 pub dap_store: Entity<DapStore>,
45 pub settings_observer: Entity<SettingsObserver>,
46 pub next_entry_id: Arc<AtomicUsize>,
47 pub languages: Arc<LanguageRegistry>,
48 pub extensions: Entity<HeadlessExtensionStore>,
49 pub git_store: Entity<GitStore>,
50}
51
52pub struct HeadlessAppState {
53 pub session: Arc<ChannelClient>,
54 pub fs: Arc<dyn Fs>,
55 pub http_client: Arc<dyn HttpClient>,
56 pub node_runtime: NodeRuntime,
57 pub languages: Arc<LanguageRegistry>,
58 pub extension_host_proxy: Arc<ExtensionHostProxy>,
59}
60
61impl HeadlessProject {
62 pub fn init(cx: &mut App) {
63 settings::init(cx);
64 language::init(cx);
65 project::Project::init_settings(cx);
66 }
67
68 pub fn new(
69 HeadlessAppState {
70 session,
71 fs,
72 http_client,
73 node_runtime,
74 languages,
75 extension_host_proxy: proxy,
76 }: HeadlessAppState,
77 cx: &mut Context<Self>,
78 ) -> Self {
79 debug_adapter_extension::init(proxy.clone(), cx);
80 language_extension::init(proxy.clone(), languages.clone());
81 languages::init(languages.clone(), node_runtime.clone(), cx);
82
83 let worktree_store = cx.new(|cx| {
84 let mut store = WorktreeStore::local(true, fs.clone());
85 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
86 store
87 });
88
89 let environment = cx.new(|_| ProjectEnvironment::new(None));
90 let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
91 let toolchain_store = cx.new(|cx| {
92 ToolchainStore::local(
93 languages.clone(),
94 worktree_store.clone(),
95 environment.clone(),
96 manifest_tree.clone(),
97 cx,
98 )
99 });
100
101 let buffer_store = cx.new(|cx| {
102 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
103 buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
104 buffer_store
105 });
106
107 let breakpoint_store =
108 cx.new(|_| BreakpointStore::local(worktree_store.clone(), buffer_store.clone()));
109
110 let dap_store = cx.new(|cx| {
111 let mut dap_store = DapStore::new_local(
112 http_client.clone(),
113 node_runtime.clone(),
114 fs.clone(),
115 environment.clone(),
116 toolchain_store.read(cx).as_language_toolchain_store(),
117 worktree_store.clone(),
118 breakpoint_store.clone(),
119 cx,
120 );
121 dap_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
122 dap_store
123 });
124
125 let git_store = cx.new(|cx| {
126 let mut store = GitStore::local(
127 &worktree_store,
128 buffer_store.clone(),
129 environment.clone(),
130 fs.clone(),
131 cx,
132 );
133 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
134 store
135 });
136
137 let prettier_store = cx.new(|cx| {
138 PrettierStore::new(
139 node_runtime.clone(),
140 fs.clone(),
141 languages.clone(),
142 worktree_store.clone(),
143 cx,
144 )
145 });
146
147 let task_store = cx.new(|cx| {
148 let mut task_store = TaskStore::local(
149 fs.clone(),
150 buffer_store.downgrade(),
151 worktree_store.clone(),
152 toolchain_store.read(cx).as_language_toolchain_store(),
153 environment.clone(),
154 cx,
155 );
156 task_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
157 task_store
158 });
159 let settings_observer = cx.new(|cx| {
160 let mut observer = SettingsObserver::new_local(
161 fs.clone(),
162 worktree_store.clone(),
163 task_store.clone(),
164 cx,
165 );
166 observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
167 observer
168 });
169
170 let lsp_store = cx.new(|cx| {
171 let mut lsp_store = LspStore::new_local(
172 buffer_store.clone(),
173 worktree_store.clone(),
174 prettier_store.clone(),
175 toolchain_store.clone(),
176 environment,
177 manifest_tree,
178 languages.clone(),
179 http_client.clone(),
180 fs.clone(),
181 cx,
182 );
183 lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
184 lsp_store
185 });
186
187 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
188
189 cx.subscribe(
190 &buffer_store,
191 |_this, _buffer_store, event, cx| match event {
192 BufferStoreEvent::BufferAdded(buffer) => {
193 cx.subscribe(buffer, Self::on_buffer_event).detach();
194 }
195 _ => {}
196 },
197 )
198 .detach();
199
200 let extensions = HeadlessExtensionStore::new(
201 fs.clone(),
202 http_client.clone(),
203 paths::remote_extensions_dir().to_path_buf(),
204 proxy,
205 node_runtime,
206 cx,
207 );
208
209 let client: AnyProtoClient = session.clone().into();
210
211 // local_machine -> ssh handlers
212 session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
213 session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
214 session.subscribe_to_entity(SSH_PROJECT_ID, &cx.entity());
215 session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
216 session.subscribe_to_entity(SSH_PROJECT_ID, &task_store);
217 session.subscribe_to_entity(SSH_PROJECT_ID, &toolchain_store);
218 session.subscribe_to_entity(SSH_PROJECT_ID, &dap_store);
219 session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
220 session.subscribe_to_entity(SSH_PROJECT_ID, &git_store);
221
222 client.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
223 client.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
224 client.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
225 client.add_request_handler(cx.weak_entity(), Self::handle_ping);
226
227 client.add_entity_request_handler(Self::handle_add_worktree);
228 client.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
229
230 client.add_entity_request_handler(Self::handle_open_buffer_by_path);
231 client.add_entity_request_handler(Self::handle_open_new_buffer);
232 client.add_entity_request_handler(Self::handle_find_search_candidates);
233 client.add_entity_request_handler(Self::handle_open_server_settings);
234
235 client.add_entity_request_handler(BufferStore::handle_update_buffer);
236 client.add_entity_message_handler(BufferStore::handle_close_buffer);
237
238 client.add_request_handler(
239 extensions.clone().downgrade(),
240 HeadlessExtensionStore::handle_sync_extensions,
241 );
242 client.add_request_handler(
243 extensions.clone().downgrade(),
244 HeadlessExtensionStore::handle_install_extension,
245 );
246
247 BufferStore::init(&client);
248 WorktreeStore::init(&client);
249 SettingsObserver::init(&client);
250 LspStore::init(&client);
251 TaskStore::init(Some(&client));
252 ToolchainStore::init(&client);
253 DapStore::init(&client, cx);
254 // todo(debugger): Re init breakpoint store when we set it up for collab
255 // BreakpointStore::init(&client);
256 GitStore::init(&client);
257
258 HeadlessProject {
259 session: client,
260 settings_observer,
261 fs,
262 worktree_store,
263 buffer_store,
264 lsp_store,
265 task_store,
266 dap_store,
267 next_entry_id: Default::default(),
268 languages,
269 extensions,
270 git_store,
271 }
272 }
273
274 fn on_buffer_event(
275 &mut self,
276 buffer: Entity<Buffer>,
277 event: &BufferEvent,
278 cx: &mut Context<Self>,
279 ) {
280 match event {
281 BufferEvent::Operation {
282 operation,
283 is_local: true,
284 } => cx
285 .background_spawn(self.session.request(proto::UpdateBuffer {
286 project_id: SSH_PROJECT_ID,
287 buffer_id: buffer.read(cx).remote_id().to_proto(),
288 operations: vec![serialize_operation(operation)],
289 }))
290 .detach(),
291 _ => {}
292 }
293 }
294
295 fn on_lsp_store_event(
296 &mut self,
297 _lsp_store: Entity<LspStore>,
298 event: &LspStoreEvent,
299 cx: &mut Context<Self>,
300 ) {
301 match event {
302 LspStoreEvent::LanguageServerUpdate {
303 language_server_id,
304 message,
305 } => {
306 self.session
307 .send(proto::UpdateLanguageServer {
308 project_id: SSH_PROJECT_ID,
309 language_server_id: language_server_id.to_proto(),
310 variant: Some(message.clone()),
311 })
312 .log_err();
313 }
314 LspStoreEvent::Notification(message) => {
315 self.session
316 .send(proto::Toast {
317 project_id: SSH_PROJECT_ID,
318 notification_id: "lsp".to_string(),
319 message: message.clone(),
320 })
321 .log_err();
322 }
323 LspStoreEvent::LanguageServerLog(language_server_id, log_type, message) => {
324 self.session
325 .send(proto::LanguageServerLog {
326 project_id: SSH_PROJECT_ID,
327 language_server_id: language_server_id.to_proto(),
328 message: message.clone(),
329 log_type: Some(log_type.to_proto()),
330 })
331 .log_err();
332 }
333 LspStoreEvent::LanguageServerPrompt(prompt) => {
334 let request = self.session.request(proto::LanguageServerPromptRequest {
335 project_id: SSH_PROJECT_ID,
336 actions: prompt
337 .actions
338 .iter()
339 .map(|action| action.title.to_string())
340 .collect(),
341 level: Some(prompt_to_proto(&prompt)),
342 lsp_name: prompt.lsp_name.clone(),
343 message: prompt.message.clone(),
344 });
345 let prompt = prompt.clone();
346 cx.background_spawn(async move {
347 let response = request.await?;
348 if let Some(action_response) = response.action_response {
349 prompt.respond(action_response as usize).await;
350 }
351 anyhow::Ok(())
352 })
353 .detach();
354 }
355 _ => {}
356 }
357 }
358
359 pub async fn handle_add_worktree(
360 this: Entity<Self>,
361 message: TypedEnvelope<proto::AddWorktree>,
362 mut cx: AsyncApp,
363 ) -> Result<proto::AddWorktreeResponse> {
364 use client::ErrorCodeExt;
365 let fs = this.read_with(&mut cx, |this, _| this.fs.clone())?;
366 let path = PathBuf::from_proto(shellexpand::tilde(&message.payload.path).to_string());
367
368 let canonicalized = match fs.canonicalize(&path).await {
369 Ok(path) => path,
370 Err(e) => {
371 let mut parent = path
372 .parent()
373 .ok_or(e)
374 .with_context(|| format!("{path:?} does not exist"))?;
375 if parent == Path::new("") {
376 parent = util::paths::home_dir();
377 }
378 let parent = fs.canonicalize(parent).await.map_err(|_| {
379 anyhow!(
380 proto::ErrorCode::DevServerProjectPathDoesNotExist
381 .with_tag("path", &path.to_string_lossy().as_ref())
382 )
383 })?;
384 parent.join(path.file_name().unwrap())
385 }
386 };
387
388 let worktree = this
389 .read_with(&mut cx.clone(), |this, _| {
390 Worktree::local(
391 Arc::from(canonicalized.as_path()),
392 message.payload.visible,
393 this.fs.clone(),
394 this.next_entry_id.clone(),
395 &mut cx,
396 )
397 })?
398 .await?;
399
400 let response = this.read_with(&mut cx, |_, cx| {
401 let worktree = worktree.read(cx);
402 proto::AddWorktreeResponse {
403 worktree_id: worktree.id().to_proto(),
404 canonicalized_path: canonicalized.to_proto(),
405 }
406 })?;
407
408 // We spawn this asynchronously, so that we can send the response back
409 // *before* `worktree_store.add()` can send out UpdateProject requests
410 // to the client about the new worktree.
411 //
412 // That lets the client manage the reference/handles of the newly-added
413 // worktree, before getting interrupted by an UpdateProject request.
414 //
415 // This fixes the problem of the client sending the AddWorktree request,
416 // headless project sending out a project update, client receiving it
417 // and immediately dropping the reference of the new client, causing it
418 // to be dropped on the headless project, and the client only then
419 // receiving a response to AddWorktree.
420 cx.spawn(async move |cx| {
421 this.update(cx, |this, cx| {
422 this.worktree_store.update(cx, |worktree_store, cx| {
423 worktree_store.add(&worktree, cx);
424 });
425 })
426 .log_err();
427 })
428 .detach();
429
430 Ok(response)
431 }
432
433 pub async fn handle_remove_worktree(
434 this: Entity<Self>,
435 envelope: TypedEnvelope<proto::RemoveWorktree>,
436 mut cx: AsyncApp,
437 ) -> Result<proto::Ack> {
438 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
439 this.update(&mut cx, |this, cx| {
440 this.worktree_store.update(cx, |worktree_store, cx| {
441 worktree_store.remove_worktree(worktree_id, cx);
442 });
443 })?;
444 Ok(proto::Ack {})
445 }
446
447 pub async fn handle_open_buffer_by_path(
448 this: Entity<Self>,
449 message: TypedEnvelope<proto::OpenBufferByPath>,
450 mut cx: AsyncApp,
451 ) -> Result<proto::OpenBufferResponse> {
452 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
453 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
454 let buffer_store = this.buffer_store.clone();
455 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
456 buffer_store.open_buffer(
457 ProjectPath {
458 worktree_id,
459 path: Arc::<Path>::from_proto(message.payload.path),
460 },
461 cx,
462 )
463 });
464 anyhow::Ok((buffer_store, buffer))
465 })??;
466
467 let buffer = buffer.await?;
468 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
469 buffer_store.update(&mut cx, |buffer_store, cx| {
470 buffer_store
471 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
472 .detach_and_log_err(cx);
473 })?;
474
475 Ok(proto::OpenBufferResponse {
476 buffer_id: buffer_id.to_proto(),
477 })
478 }
479
480 pub async fn handle_open_new_buffer(
481 this: Entity<Self>,
482 _message: TypedEnvelope<proto::OpenNewBuffer>,
483 mut cx: AsyncApp,
484 ) -> Result<proto::OpenBufferResponse> {
485 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
486 let buffer_store = this.buffer_store.clone();
487 let buffer = this
488 .buffer_store
489 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx));
490 anyhow::Ok((buffer_store, buffer))
491 })??;
492
493 let buffer = buffer.await?;
494 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
495 buffer_store.update(&mut cx, |buffer_store, cx| {
496 buffer_store
497 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
498 .detach_and_log_err(cx);
499 })?;
500
501 Ok(proto::OpenBufferResponse {
502 buffer_id: buffer_id.to_proto(),
503 })
504 }
505
506 pub async fn handle_open_server_settings(
507 this: Entity<Self>,
508 _: TypedEnvelope<proto::OpenServerSettings>,
509 mut cx: AsyncApp,
510 ) -> Result<proto::OpenBufferResponse> {
511 let settings_path = paths::settings_file();
512 let (worktree, path) = this
513 .update(&mut cx, |this, cx| {
514 this.worktree_store.update(cx, |worktree_store, cx| {
515 worktree_store.find_or_create_worktree(settings_path, false, cx)
516 })
517 })?
518 .await?;
519
520 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
521 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
522 buffer_store.open_buffer(
523 ProjectPath {
524 worktree_id: worktree.read(cx).id(),
525 path: path.into(),
526 },
527 cx,
528 )
529 });
530
531 (buffer, this.buffer_store.clone())
532 })?;
533
534 let buffer = buffer.await?;
535
536 let buffer_id = cx.update(|cx| {
537 if buffer.read(cx).is_empty() {
538 buffer.update(cx, |buffer, cx| {
539 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
540 });
541 }
542
543 let buffer_id = buffer.read(cx).remote_id();
544
545 buffer_store.update(cx, |buffer_store, cx| {
546 buffer_store
547 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
548 .detach_and_log_err(cx);
549 });
550
551 buffer_id
552 })?;
553
554 Ok(proto::OpenBufferResponse {
555 buffer_id: buffer_id.to_proto(),
556 })
557 }
558
559 pub async fn handle_find_search_candidates(
560 this: Entity<Self>,
561 envelope: TypedEnvelope<proto::FindSearchCandidates>,
562 mut cx: AsyncApp,
563 ) -> Result<proto::FindSearchCandidatesResponse> {
564 let message = envelope.payload;
565 let query = SearchQuery::from_proto(message.query.context("missing query field")?)?;
566 let results = this.update(&mut cx, |this, cx| {
567 this.buffer_store.update(cx, |buffer_store, cx| {
568 buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
569 })
570 })?;
571
572 let mut response = proto::FindSearchCandidatesResponse {
573 buffer_ids: Vec::new(),
574 };
575
576 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
577
578 while let Ok(buffer) = results.recv().await {
579 let buffer_id = buffer.read_with(&mut cx, |this, _| this.remote_id())?;
580 response.buffer_ids.push(buffer_id.to_proto());
581 buffer_store
582 .update(&mut cx, |buffer_store, cx| {
583 buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
584 })?
585 .await?;
586 }
587
588 Ok(response)
589 }
590
591 pub async fn handle_list_remote_directory(
592 this: Entity<Self>,
593 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
594 cx: AsyncApp,
595 ) -> Result<proto::ListRemoteDirectoryResponse> {
596 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
597 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
598 let check_info = envelope
599 .payload
600 .config
601 .as_ref()
602 .is_some_and(|config| config.is_dir);
603
604 let mut entries = Vec::new();
605 let mut entry_info = Vec::new();
606 let mut response = fs.read_dir(&expanded).await?;
607 while let Some(path) = response.next().await {
608 let path = path?;
609 if let Some(file_name) = path.file_name() {
610 entries.push(file_name.to_string_lossy().to_string());
611 if check_info {
612 let is_dir = fs.is_dir(&path).await;
613 entry_info.push(proto::EntryInfo { is_dir });
614 }
615 }
616 }
617 Ok(proto::ListRemoteDirectoryResponse {
618 entries,
619 entry_info,
620 })
621 }
622
623 pub async fn handle_get_path_metadata(
624 this: Entity<Self>,
625 envelope: TypedEnvelope<proto::GetPathMetadata>,
626 cx: AsyncApp,
627 ) -> Result<proto::GetPathMetadataResponse> {
628 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
629 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
630
631 let metadata = fs.metadata(&expanded).await?;
632 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
633
634 Ok(proto::GetPathMetadataResponse {
635 exists: metadata.is_some(),
636 is_dir,
637 path: expanded.to_proto(),
638 })
639 }
640
641 pub async fn handle_shutdown_remote_server(
642 _this: Entity<Self>,
643 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
644 cx: AsyncApp,
645 ) -> Result<proto::Ack> {
646 cx.spawn(async move |cx| {
647 cx.update(|cx| {
648 // TODO: This is a hack, because in a headless project, shutdown isn't executed
649 // when calling quit, but it should be.
650 cx.shutdown();
651 cx.quit();
652 })
653 })
654 .detach();
655
656 Ok(proto::Ack {})
657 }
658
659 pub async fn handle_ping(
660 _this: Entity<Self>,
661 _envelope: TypedEnvelope<proto::Ping>,
662 _cx: AsyncApp,
663 ) -> Result<proto::Ack> {
664 log::debug!("Received ping from client");
665 Ok(proto::Ack {})
666 }
667}
668
669fn prompt_to_proto(
670 prompt: &project::LanguageServerPromptRequest,
671) -> proto::language_server_prompt_request::Level {
672 match prompt.level {
673 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
674 proto::language_server_prompt_request::Info {},
675 ),
676 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
677 proto::language_server_prompt_request::Warning {},
678 ),
679 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
680 proto::language_server_prompt_request::Critical {},
681 ),
682 }
683}