1use ::proto::{FromProto, ToProto};
2use anyhow::{anyhow, Result};
3use dap::DapRegistry;
4use extension::ExtensionHostProxy;
5use extension_host::headless_host::HeadlessExtensionStore;
6use fs::Fs;
7use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
8use http_client::HttpClient;
9use language::{proto::serialize_operation, Buffer, BufferEvent, LanguageRegistry};
10use node_runtime::NodeRuntime;
11use project::{
12 buffer_store::{BufferStore, BufferStoreEvent},
13 debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
14 git_store::GitStore,
15 project_settings::SettingsObserver,
16 search::SearchQuery,
17 task_store::TaskStore,
18 worktree_store::WorktreeStore,
19 LspStore, LspStoreEvent, PrettierStore, ProjectPath, ToolchainStore, WorktreeId,
20};
21use remote::ssh_session::ChannelClient;
22use rpc::{
23 proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
24 AnyProtoClient, TypedEnvelope,
25};
26
27use settings::initial_server_settings_content;
28use smol::stream::StreamExt;
29use std::{
30 path::{Path, PathBuf},
31 sync::{atomic::AtomicUsize, Arc},
32};
33use util::ResultExt;
34use worktree::Worktree;
35
36pub struct HeadlessProject {
37 pub fs: Arc<dyn Fs>,
38 pub session: AnyProtoClient,
39 pub worktree_store: Entity<WorktreeStore>,
40 pub buffer_store: Entity<BufferStore>,
41 pub lsp_store: Entity<LspStore>,
42 pub task_store: Entity<TaskStore>,
43 pub settings_observer: Entity<SettingsObserver>,
44 pub next_entry_id: Arc<AtomicUsize>,
45 pub languages: Arc<LanguageRegistry>,
46 pub extensions: Entity<HeadlessExtensionStore>,
47 pub git_store: Entity<GitStore>,
48}
49
50pub struct HeadlessAppState {
51 pub session: Arc<ChannelClient>,
52 pub fs: Arc<dyn Fs>,
53 pub http_client: Arc<dyn HttpClient>,
54 pub node_runtime: NodeRuntime,
55 pub languages: Arc<LanguageRegistry>,
56 pub debug_adapters: Arc<DapRegistry>,
57 pub extension_host_proxy: Arc<ExtensionHostProxy>,
58}
59
60impl HeadlessProject {
61 pub fn init(cx: &mut App) {
62 settings::init(cx);
63 language::init(cx);
64 project::Project::init_settings(cx);
65 }
66
67 pub fn new(
68 HeadlessAppState {
69 session,
70 fs,
71 http_client,
72 node_runtime,
73 languages,
74 debug_adapters,
75 extension_host_proxy: proxy,
76 }: HeadlessAppState,
77 cx: &mut Context<Self>,
78 ) -> Self {
79 language_extension::init(proxy.clone(), languages.clone());
80 languages::init(languages.clone(), node_runtime.clone(), cx);
81
82 let worktree_store = cx.new(|cx| {
83 let mut store = WorktreeStore::local(true, fs.clone());
84 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
85 store
86 });
87
88 let environment = project::ProjectEnvironment::new(&worktree_store, None, cx);
89
90 let toolchain_store = cx.new(|cx| {
91 ToolchainStore::local(
92 languages.clone(),
93 worktree_store.clone(),
94 environment.clone(),
95 cx,
96 )
97 });
98
99 let buffer_store = cx.new(|cx| {
100 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
101 buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
102 buffer_store
103 });
104
105 let breakpoint_store =
106 cx.new(|_| BreakpointStore::local(worktree_store.clone(), buffer_store.clone()));
107
108 let dap_store = cx.new(|cx| {
109 DapStore::new_local(
110 http_client.clone(),
111 node_runtime.clone(),
112 fs.clone(),
113 languages.clone(),
114 debug_adapters.clone(),
115 environment.clone(),
116 toolchain_store.read(cx).as_language_toolchain_store(),
117 breakpoint_store.clone(),
118 worktree_store.clone(),
119 cx,
120 )
121 });
122
123 let git_store = cx.new(|cx| {
124 let mut store = GitStore::local(
125 &worktree_store,
126 buffer_store.clone(),
127 environment.clone(),
128 fs.clone(),
129 cx,
130 );
131 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
132 store
133 });
134
135 let prettier_store = cx.new(|cx| {
136 PrettierStore::new(
137 node_runtime.clone(),
138 fs.clone(),
139 languages.clone(),
140 worktree_store.clone(),
141 cx,
142 )
143 });
144
145 let task_store = cx.new(|cx| {
146 let mut task_store = TaskStore::local(
147 buffer_store.downgrade(),
148 worktree_store.clone(),
149 toolchain_store.read(cx).as_language_toolchain_store(),
150 environment.clone(),
151 cx,
152 );
153 task_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
154 task_store
155 });
156 let settings_observer = cx.new(|cx| {
157 let mut observer = SettingsObserver::new_local(
158 fs.clone(),
159 worktree_store.clone(),
160 task_store.clone(),
161 cx,
162 );
163 observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
164 observer
165 });
166
167 let lsp_store = cx.new(|cx| {
168 let mut lsp_store = LspStore::new_local(
169 buffer_store.clone(),
170 worktree_store.clone(),
171 prettier_store.clone(),
172 toolchain_store.clone(),
173 environment,
174 languages.clone(),
175 http_client.clone(),
176 fs.clone(),
177 cx,
178 );
179 lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
180 lsp_store
181 });
182
183 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
184
185 cx.subscribe(
186 &buffer_store,
187 |_this, _buffer_store, event, cx| match event {
188 BufferStoreEvent::BufferAdded(buffer) => {
189 cx.subscribe(buffer, Self::on_buffer_event).detach();
190 }
191 _ => {}
192 },
193 )
194 .detach();
195
196 let extensions = HeadlessExtensionStore::new(
197 fs.clone(),
198 http_client.clone(),
199 paths::remote_extensions_dir().to_path_buf(),
200 proxy,
201 node_runtime,
202 cx,
203 );
204
205 let client: AnyProtoClient = session.clone().into();
206
207 // local_machine -> ssh handlers
208 session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
209 session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
210 session.subscribe_to_entity(SSH_PROJECT_ID, &cx.entity());
211 session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
212 session.subscribe_to_entity(SSH_PROJECT_ID, &task_store);
213 session.subscribe_to_entity(SSH_PROJECT_ID, &toolchain_store);
214 session.subscribe_to_entity(SSH_PROJECT_ID, &dap_store);
215 session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
216 session.subscribe_to_entity(SSH_PROJECT_ID, &git_store);
217
218 client.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
219 client.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
220 client.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
221 client.add_request_handler(cx.weak_entity(), Self::handle_ping);
222
223 client.add_entity_request_handler(Self::handle_add_worktree);
224 client.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
225
226 client.add_entity_request_handler(Self::handle_open_buffer_by_path);
227 client.add_entity_request_handler(Self::handle_open_new_buffer);
228 client.add_entity_request_handler(Self::handle_find_search_candidates);
229 client.add_entity_request_handler(Self::handle_open_server_settings);
230
231 client.add_entity_request_handler(BufferStore::handle_update_buffer);
232 client.add_entity_message_handler(BufferStore::handle_close_buffer);
233
234 client.add_request_handler(
235 extensions.clone().downgrade(),
236 HeadlessExtensionStore::handle_sync_extensions,
237 );
238 client.add_request_handler(
239 extensions.clone().downgrade(),
240 HeadlessExtensionStore::handle_install_extension,
241 );
242
243 BufferStore::init(&client);
244 WorktreeStore::init(&client);
245 SettingsObserver::init(&client);
246 LspStore::init(&client);
247 TaskStore::init(Some(&client));
248 ToolchainStore::init(&client);
249 DapStore::init(&client);
250 // todo(debugger): Re init breakpoint store when we set it up for collab
251 // BreakpointStore::init(&client);
252 GitStore::init(&client);
253
254 HeadlessProject {
255 session: client,
256 settings_observer,
257 fs,
258 worktree_store,
259 buffer_store,
260 lsp_store,
261 task_store,
262 next_entry_id: Default::default(),
263 languages,
264 extensions,
265 git_store,
266 }
267 }
268
269 fn on_buffer_event(
270 &mut self,
271 buffer: Entity<Buffer>,
272 event: &BufferEvent,
273 cx: &mut Context<Self>,
274 ) {
275 match event {
276 BufferEvent::Operation {
277 operation,
278 is_local: true,
279 } => cx
280 .background_spawn(self.session.request(proto::UpdateBuffer {
281 project_id: SSH_PROJECT_ID,
282 buffer_id: buffer.read(cx).remote_id().to_proto(),
283 operations: vec![serialize_operation(operation)],
284 }))
285 .detach(),
286 _ => {}
287 }
288 }
289
290 fn on_lsp_store_event(
291 &mut self,
292 _lsp_store: Entity<LspStore>,
293 event: &LspStoreEvent,
294 cx: &mut Context<Self>,
295 ) {
296 match event {
297 LspStoreEvent::LanguageServerUpdate {
298 language_server_id,
299 message,
300 } => {
301 self.session
302 .send(proto::UpdateLanguageServer {
303 project_id: SSH_PROJECT_ID,
304 language_server_id: language_server_id.to_proto(),
305 variant: Some(message.clone()),
306 })
307 .log_err();
308 }
309 LspStoreEvent::Notification(message) => {
310 self.session
311 .send(proto::Toast {
312 project_id: SSH_PROJECT_ID,
313 notification_id: "lsp".to_string(),
314 message: message.clone(),
315 })
316 .log_err();
317 }
318 LspStoreEvent::LanguageServerLog(language_server_id, log_type, message) => {
319 self.session
320 .send(proto::LanguageServerLog {
321 project_id: SSH_PROJECT_ID,
322 language_server_id: language_server_id.to_proto(),
323 message: message.clone(),
324 log_type: Some(log_type.to_proto()),
325 })
326 .log_err();
327 }
328 LspStoreEvent::LanguageServerPrompt(prompt) => {
329 let request = self.session.request(proto::LanguageServerPromptRequest {
330 project_id: SSH_PROJECT_ID,
331 actions: prompt
332 .actions
333 .iter()
334 .map(|action| action.title.to_string())
335 .collect(),
336 level: Some(prompt_to_proto(&prompt)),
337 lsp_name: prompt.lsp_name.clone(),
338 message: prompt.message.clone(),
339 });
340 let prompt = prompt.clone();
341 cx.background_spawn(async move {
342 let response = request.await?;
343 if let Some(action_response) = response.action_response {
344 prompt.respond(action_response as usize).await;
345 }
346 anyhow::Ok(())
347 })
348 .detach();
349 }
350 _ => {}
351 }
352 }
353
354 pub async fn handle_add_worktree(
355 this: Entity<Self>,
356 message: TypedEnvelope<proto::AddWorktree>,
357 mut cx: AsyncApp,
358 ) -> Result<proto::AddWorktreeResponse> {
359 use client::ErrorCodeExt;
360 let fs = this.read_with(&mut cx, |this, _| this.fs.clone())?;
361 let path = PathBuf::from_proto(shellexpand::tilde(&message.payload.path).to_string());
362
363 let canonicalized = match fs.canonicalize(&path).await {
364 Ok(path) => path,
365 Err(e) => {
366 let mut parent = path
367 .parent()
368 .ok_or(e)
369 .map_err(|_| anyhow!("{:?} does not exist", path))?;
370 if parent == Path::new("") {
371 parent = util::paths::home_dir();
372 }
373 let parent = fs.canonicalize(parent).await.map_err(|_| {
374 anyhow!(proto::ErrorCode::DevServerProjectPathDoesNotExist
375 .with_tag("path", &path.to_string_lossy().as_ref()))
376 })?;
377 parent.join(path.file_name().unwrap())
378 }
379 };
380
381 let worktree = this
382 .update(&mut cx.clone(), |this, _| {
383 Worktree::local(
384 Arc::from(canonicalized.as_path()),
385 message.payload.visible,
386 this.fs.clone(),
387 this.next_entry_id.clone(),
388 &mut cx,
389 )
390 })?
391 .await?;
392
393 let response = this.update(&mut cx, |_, cx| {
394 worktree.update(cx, |worktree, _| proto::AddWorktreeResponse {
395 worktree_id: worktree.id().to_proto(),
396 canonicalized_path: canonicalized.to_proto(),
397 })
398 })?;
399
400 // We spawn this asynchronously, so that we can send the response back
401 // *before* `worktree_store.add()` can send out UpdateProject requests
402 // to the client about the new worktree.
403 //
404 // That lets the client manage the reference/handles of the newly-added
405 // worktree, before getting interrupted by an UpdateProject request.
406 //
407 // This fixes the problem of the client sending the AddWorktree request,
408 // headless project sending out a project update, client receiving it
409 // and immediately dropping the reference of the new client, causing it
410 // to be dropped on the headless project, and the client only then
411 // receiving a response to AddWorktree.
412 cx.spawn(async move |cx| {
413 this.update(cx, |this, cx| {
414 this.worktree_store.update(cx, |worktree_store, cx| {
415 worktree_store.add(&worktree, cx);
416 });
417 })
418 .log_err();
419 })
420 .detach();
421
422 Ok(response)
423 }
424
425 pub async fn handle_remove_worktree(
426 this: Entity<Self>,
427 envelope: TypedEnvelope<proto::RemoveWorktree>,
428 mut cx: AsyncApp,
429 ) -> Result<proto::Ack> {
430 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
431 this.update(&mut cx, |this, cx| {
432 this.worktree_store.update(cx, |worktree_store, cx| {
433 worktree_store.remove_worktree(worktree_id, cx);
434 });
435 })?;
436 Ok(proto::Ack {})
437 }
438
439 pub async fn handle_open_buffer_by_path(
440 this: Entity<Self>,
441 message: TypedEnvelope<proto::OpenBufferByPath>,
442 mut cx: AsyncApp,
443 ) -> Result<proto::OpenBufferResponse> {
444 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
445 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
446 let buffer_store = this.buffer_store.clone();
447 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
448 buffer_store.open_buffer(
449 ProjectPath {
450 worktree_id,
451 path: Arc::<Path>::from_proto(message.payload.path),
452 },
453 cx,
454 )
455 });
456 anyhow::Ok((buffer_store, buffer))
457 })??;
458
459 let buffer = buffer.await?;
460 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
461 buffer_store.update(&mut cx, |buffer_store, cx| {
462 buffer_store
463 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
464 .detach_and_log_err(cx);
465 })?;
466
467 Ok(proto::OpenBufferResponse {
468 buffer_id: buffer_id.to_proto(),
469 })
470 }
471
472 pub async fn handle_open_new_buffer(
473 this: Entity<Self>,
474 _message: TypedEnvelope<proto::OpenNewBuffer>,
475 mut cx: AsyncApp,
476 ) -> Result<proto::OpenBufferResponse> {
477 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
478 let buffer_store = this.buffer_store.clone();
479 let buffer = this
480 .buffer_store
481 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx));
482 anyhow::Ok((buffer_store, buffer))
483 })??;
484
485 let buffer = buffer.await?;
486 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
487 buffer_store.update(&mut cx, |buffer_store, cx| {
488 buffer_store
489 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
490 .detach_and_log_err(cx);
491 })?;
492
493 Ok(proto::OpenBufferResponse {
494 buffer_id: buffer_id.to_proto(),
495 })
496 }
497
498 pub async fn handle_open_server_settings(
499 this: Entity<Self>,
500 _: TypedEnvelope<proto::OpenServerSettings>,
501 mut cx: AsyncApp,
502 ) -> Result<proto::OpenBufferResponse> {
503 let settings_path = paths::settings_file();
504 let (worktree, path) = this
505 .update(&mut cx, |this, cx| {
506 this.worktree_store.update(cx, |worktree_store, cx| {
507 worktree_store.find_or_create_worktree(settings_path, false, cx)
508 })
509 })?
510 .await?;
511
512 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
513 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
514 buffer_store.open_buffer(
515 ProjectPath {
516 worktree_id: worktree.read(cx).id(),
517 path: path.into(),
518 },
519 cx,
520 )
521 });
522
523 (buffer, this.buffer_store.clone())
524 })?;
525
526 let buffer = buffer.await?;
527
528 let buffer_id = cx.update(|cx| {
529 if buffer.read(cx).is_empty() {
530 buffer.update(cx, |buffer, cx| {
531 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
532 });
533 }
534
535 let buffer_id = buffer.read_with(cx, |b, _| b.remote_id());
536
537 buffer_store.update(cx, |buffer_store, cx| {
538 buffer_store
539 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
540 .detach_and_log_err(cx);
541 });
542
543 buffer_id
544 })?;
545
546 Ok(proto::OpenBufferResponse {
547 buffer_id: buffer_id.to_proto(),
548 })
549 }
550
551 pub async fn handle_find_search_candidates(
552 this: Entity<Self>,
553 envelope: TypedEnvelope<proto::FindSearchCandidates>,
554 mut cx: AsyncApp,
555 ) -> Result<proto::FindSearchCandidatesResponse> {
556 let message = envelope.payload;
557 let query = SearchQuery::from_proto(
558 message
559 .query
560 .ok_or_else(|| anyhow!("missing query field"))?,
561 )?;
562 let results = this.update(&mut cx, |this, cx| {
563 this.buffer_store.update(cx, |buffer_store, cx| {
564 buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
565 })
566 })?;
567
568 let mut response = proto::FindSearchCandidatesResponse {
569 buffer_ids: Vec::new(),
570 };
571
572 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
573
574 while let Ok(buffer) = results.recv().await {
575 let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
576 response.buffer_ids.push(buffer_id.to_proto());
577 buffer_store
578 .update(&mut cx, |buffer_store, cx| {
579 buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
580 })?
581 .await?;
582 }
583
584 Ok(response)
585 }
586
587 pub async fn handle_list_remote_directory(
588 this: Entity<Self>,
589 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
590 cx: AsyncApp,
591 ) -> Result<proto::ListRemoteDirectoryResponse> {
592 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
593 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
594 let check_info = envelope
595 .payload
596 .config
597 .as_ref()
598 .is_some_and(|config| config.is_dir);
599
600 let mut entries = Vec::new();
601 let mut entry_info = Vec::new();
602 let mut response = fs.read_dir(&expanded).await?;
603 while let Some(path) = response.next().await {
604 let path = path?;
605 if let Some(file_name) = path.file_name() {
606 entries.push(file_name.to_string_lossy().to_string());
607 if check_info {
608 let is_dir = fs.is_dir(&path).await;
609 entry_info.push(proto::EntryInfo { is_dir });
610 }
611 }
612 }
613 Ok(proto::ListRemoteDirectoryResponse {
614 entries,
615 entry_info,
616 })
617 }
618
619 pub async fn handle_get_path_metadata(
620 this: Entity<Self>,
621 envelope: TypedEnvelope<proto::GetPathMetadata>,
622 cx: AsyncApp,
623 ) -> Result<proto::GetPathMetadataResponse> {
624 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
625 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
626
627 let metadata = fs.metadata(&expanded).await?;
628 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
629
630 Ok(proto::GetPathMetadataResponse {
631 exists: metadata.is_some(),
632 is_dir,
633 path: expanded.to_proto(),
634 })
635 }
636
637 pub async fn handle_shutdown_remote_server(
638 _this: Entity<Self>,
639 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
640 cx: AsyncApp,
641 ) -> Result<proto::Ack> {
642 cx.spawn(async move |cx| {
643 cx.update(|cx| {
644 // TODO: This is a hack, because in a headless project, shutdown isn't executed
645 // when calling quit, but it should be.
646 cx.shutdown();
647 cx.quit();
648 })
649 })
650 .detach();
651
652 Ok(proto::Ack {})
653 }
654
655 pub async fn handle_ping(
656 _this: Entity<Self>,
657 _envelope: TypedEnvelope<proto::Ping>,
658 _cx: AsyncApp,
659 ) -> Result<proto::Ack> {
660 log::debug!("Received ping from client");
661 Ok(proto::Ack {})
662 }
663}
664
665fn prompt_to_proto(
666 prompt: &project::LanguageServerPromptRequest,
667) -> proto::language_server_prompt_request::Level {
668 match prompt.level {
669 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
670 proto::language_server_prompt_request::Info {},
671 ),
672 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
673 proto::language_server_prompt_request::Warning {},
674 ),
675 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
676 proto::language_server_prompt_request::Critical {},
677 ),
678 }
679}