1use ::proto::{FromProto, ToProto};
2use anyhow::{Context as _, Result, anyhow};
3
4use extension::ExtensionHostProxy;
5use extension_host::headless_host::HeadlessExtensionStore;
6use fs::Fs;
7use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
8use http_client::HttpClient;
9use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
10use node_runtime::NodeRuntime;
11use project::{
12 LspStore, LspStoreEvent, PrettierStore, ProjectEnvironment, ProjectPath, ToolchainStore,
13 WorktreeId,
14 buffer_store::{BufferStore, BufferStoreEvent},
15 debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
16 git_store::GitStore,
17 project_settings::SettingsObserver,
18 search::SearchQuery,
19 task_store::TaskStore,
20 worktree_store::WorktreeStore,
21};
22use remote::ssh_session::ChannelClient;
23use rpc::{
24 AnyProtoClient, TypedEnvelope,
25 proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
26};
27
28use settings::initial_server_settings_content;
29use smol::stream::StreamExt;
30use std::{
31 path::{Path, PathBuf},
32 sync::{Arc, atomic::AtomicUsize},
33};
34use util::ResultExt;
35use worktree::Worktree;
36
37pub struct HeadlessProject {
38 pub fs: Arc<dyn Fs>,
39 pub session: AnyProtoClient,
40 pub worktree_store: Entity<WorktreeStore>,
41 pub buffer_store: Entity<BufferStore>,
42 pub lsp_store: Entity<LspStore>,
43 pub task_store: Entity<TaskStore>,
44 pub dap_store: Entity<DapStore>,
45 pub settings_observer: Entity<SettingsObserver>,
46 pub next_entry_id: Arc<AtomicUsize>,
47 pub languages: Arc<LanguageRegistry>,
48 pub extensions: Entity<HeadlessExtensionStore>,
49 pub git_store: Entity<GitStore>,
50}
51
52pub struct HeadlessAppState {
53 pub session: Arc<ChannelClient>,
54 pub fs: Arc<dyn Fs>,
55 pub http_client: Arc<dyn HttpClient>,
56 pub node_runtime: NodeRuntime,
57 pub languages: Arc<LanguageRegistry>,
58 pub extension_host_proxy: Arc<ExtensionHostProxy>,
59}
60
61impl HeadlessProject {
62 pub fn init(cx: &mut App) {
63 settings::init(cx);
64 language::init(cx);
65 project::Project::init_settings(cx);
66 }
67
68 pub fn new(
69 HeadlessAppState {
70 session,
71 fs,
72 http_client,
73 node_runtime,
74 languages,
75 extension_host_proxy: proxy,
76 }: HeadlessAppState,
77 cx: &mut Context<Self>,
78 ) -> Self {
79 debug_adapter_extension::init(proxy.clone(), cx);
80 language_extension::init(proxy.clone(), languages.clone());
81 languages::init(languages.clone(), node_runtime.clone(), cx);
82
83 let worktree_store = cx.new(|cx| {
84 let mut store = WorktreeStore::local(true, fs.clone());
85 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
86 store
87 });
88
89 let environment = cx.new(|_| ProjectEnvironment::new(None));
90
91 let toolchain_store = cx.new(|cx| {
92 ToolchainStore::local(
93 languages.clone(),
94 worktree_store.clone(),
95 environment.clone(),
96 cx,
97 )
98 });
99
100 let buffer_store = cx.new(|cx| {
101 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
102 buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
103 buffer_store
104 });
105
106 let breakpoint_store =
107 cx.new(|_| BreakpointStore::local(worktree_store.clone(), buffer_store.clone()));
108
109 let dap_store = cx.new(|cx| {
110 let mut dap_store = DapStore::new_local(
111 http_client.clone(),
112 node_runtime.clone(),
113 fs.clone(),
114 environment.clone(),
115 toolchain_store.read(cx).as_language_toolchain_store(),
116 worktree_store.clone(),
117 breakpoint_store.clone(),
118 cx,
119 );
120 dap_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
121 dap_store
122 });
123
124 let git_store = cx.new(|cx| {
125 let mut store = GitStore::local(
126 &worktree_store,
127 buffer_store.clone(),
128 environment.clone(),
129 fs.clone(),
130 cx,
131 );
132 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
133 store
134 });
135
136 let prettier_store = cx.new(|cx| {
137 PrettierStore::new(
138 node_runtime.clone(),
139 fs.clone(),
140 languages.clone(),
141 worktree_store.clone(),
142 cx,
143 )
144 });
145
146 let task_store = cx.new(|cx| {
147 let mut task_store = TaskStore::local(
148 buffer_store.downgrade(),
149 worktree_store.clone(),
150 toolchain_store.read(cx).as_language_toolchain_store(),
151 environment.clone(),
152 cx,
153 );
154 task_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
155 task_store
156 });
157 let settings_observer = cx.new(|cx| {
158 let mut observer = SettingsObserver::new_local(
159 fs.clone(),
160 worktree_store.clone(),
161 task_store.clone(),
162 cx,
163 );
164 observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
165 observer
166 });
167
168 let lsp_store = cx.new(|cx| {
169 let mut lsp_store = LspStore::new_local(
170 buffer_store.clone(),
171 worktree_store.clone(),
172 prettier_store.clone(),
173 toolchain_store.clone(),
174 environment,
175 languages.clone(),
176 http_client.clone(),
177 fs.clone(),
178 cx,
179 );
180 lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
181 lsp_store
182 });
183
184 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
185
186 cx.subscribe(
187 &buffer_store,
188 |_this, _buffer_store, event, cx| match event {
189 BufferStoreEvent::BufferAdded(buffer) => {
190 cx.subscribe(buffer, Self::on_buffer_event).detach();
191 }
192 _ => {}
193 },
194 )
195 .detach();
196
197 let extensions = HeadlessExtensionStore::new(
198 fs.clone(),
199 http_client.clone(),
200 paths::remote_extensions_dir().to_path_buf(),
201 proxy,
202 node_runtime,
203 cx,
204 );
205
206 let client: AnyProtoClient = session.clone().into();
207
208 // local_machine -> ssh handlers
209 session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
210 session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
211 session.subscribe_to_entity(SSH_PROJECT_ID, &cx.entity());
212 session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
213 session.subscribe_to_entity(SSH_PROJECT_ID, &task_store);
214 session.subscribe_to_entity(SSH_PROJECT_ID, &toolchain_store);
215 session.subscribe_to_entity(SSH_PROJECT_ID, &dap_store);
216 session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
217 session.subscribe_to_entity(SSH_PROJECT_ID, &git_store);
218
219 client.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
220 client.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
221 client.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
222 client.add_request_handler(cx.weak_entity(), Self::handle_ping);
223
224 client.add_entity_request_handler(Self::handle_add_worktree);
225 client.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
226
227 client.add_entity_request_handler(Self::handle_open_buffer_by_path);
228 client.add_entity_request_handler(Self::handle_open_new_buffer);
229 client.add_entity_request_handler(Self::handle_find_search_candidates);
230 client.add_entity_request_handler(Self::handle_open_server_settings);
231
232 client.add_entity_request_handler(BufferStore::handle_update_buffer);
233 client.add_entity_message_handler(BufferStore::handle_close_buffer);
234
235 client.add_request_handler(
236 extensions.clone().downgrade(),
237 HeadlessExtensionStore::handle_sync_extensions,
238 );
239 client.add_request_handler(
240 extensions.clone().downgrade(),
241 HeadlessExtensionStore::handle_install_extension,
242 );
243
244 BufferStore::init(&client);
245 WorktreeStore::init(&client);
246 SettingsObserver::init(&client);
247 LspStore::init(&client);
248 TaskStore::init(Some(&client));
249 ToolchainStore::init(&client);
250 DapStore::init(&client, cx);
251 // todo(debugger): Re init breakpoint store when we set it up for collab
252 // BreakpointStore::init(&client);
253 GitStore::init(&client);
254
255 HeadlessProject {
256 session: client,
257 settings_observer,
258 fs,
259 worktree_store,
260 buffer_store,
261 lsp_store,
262 task_store,
263 dap_store,
264 next_entry_id: Default::default(),
265 languages,
266 extensions,
267 git_store,
268 }
269 }
270
271 fn on_buffer_event(
272 &mut self,
273 buffer: Entity<Buffer>,
274 event: &BufferEvent,
275 cx: &mut Context<Self>,
276 ) {
277 match event {
278 BufferEvent::Operation {
279 operation,
280 is_local: true,
281 } => cx
282 .background_spawn(self.session.request(proto::UpdateBuffer {
283 project_id: SSH_PROJECT_ID,
284 buffer_id: buffer.read(cx).remote_id().to_proto(),
285 operations: vec![serialize_operation(operation)],
286 }))
287 .detach(),
288 _ => {}
289 }
290 }
291
292 fn on_lsp_store_event(
293 &mut self,
294 _lsp_store: Entity<LspStore>,
295 event: &LspStoreEvent,
296 cx: &mut Context<Self>,
297 ) {
298 match event {
299 LspStoreEvent::LanguageServerUpdate {
300 language_server_id,
301 message,
302 } => {
303 self.session
304 .send(proto::UpdateLanguageServer {
305 project_id: SSH_PROJECT_ID,
306 language_server_id: language_server_id.to_proto(),
307 variant: Some(message.clone()),
308 })
309 .log_err();
310 }
311 LspStoreEvent::Notification(message) => {
312 self.session
313 .send(proto::Toast {
314 project_id: SSH_PROJECT_ID,
315 notification_id: "lsp".to_string(),
316 message: message.clone(),
317 })
318 .log_err();
319 }
320 LspStoreEvent::LanguageServerLog(language_server_id, log_type, message) => {
321 self.session
322 .send(proto::LanguageServerLog {
323 project_id: SSH_PROJECT_ID,
324 language_server_id: language_server_id.to_proto(),
325 message: message.clone(),
326 log_type: Some(log_type.to_proto()),
327 })
328 .log_err();
329 }
330 LspStoreEvent::LanguageServerPrompt(prompt) => {
331 let request = self.session.request(proto::LanguageServerPromptRequest {
332 project_id: SSH_PROJECT_ID,
333 actions: prompt
334 .actions
335 .iter()
336 .map(|action| action.title.to_string())
337 .collect(),
338 level: Some(prompt_to_proto(&prompt)),
339 lsp_name: prompt.lsp_name.clone(),
340 message: prompt.message.clone(),
341 });
342 let prompt = prompt.clone();
343 cx.background_spawn(async move {
344 let response = request.await?;
345 if let Some(action_response) = response.action_response {
346 prompt.respond(action_response as usize).await;
347 }
348 anyhow::Ok(())
349 })
350 .detach();
351 }
352 _ => {}
353 }
354 }
355
356 pub async fn handle_add_worktree(
357 this: Entity<Self>,
358 message: TypedEnvelope<proto::AddWorktree>,
359 mut cx: AsyncApp,
360 ) -> Result<proto::AddWorktreeResponse> {
361 use client::ErrorCodeExt;
362 let fs = this.read_with(&mut cx, |this, _| this.fs.clone())?;
363 let path = PathBuf::from_proto(shellexpand::tilde(&message.payload.path).to_string());
364
365 let canonicalized = match fs.canonicalize(&path).await {
366 Ok(path) => path,
367 Err(e) => {
368 let mut parent = path
369 .parent()
370 .ok_or(e)
371 .with_context(|| format!("{path:?} does not exist"))?;
372 if parent == Path::new("") {
373 parent = util::paths::home_dir();
374 }
375 let parent = fs.canonicalize(parent).await.map_err(|_| {
376 anyhow!(
377 proto::ErrorCode::DevServerProjectPathDoesNotExist
378 .with_tag("path", &path.to_string_lossy().as_ref())
379 )
380 })?;
381 parent.join(path.file_name().unwrap())
382 }
383 };
384
385 let worktree = this
386 .update(&mut cx.clone(), |this, _| {
387 Worktree::local(
388 Arc::from(canonicalized.as_path()),
389 message.payload.visible,
390 this.fs.clone(),
391 this.next_entry_id.clone(),
392 &mut cx,
393 )
394 })?
395 .await?;
396
397 let response = this.update(&mut cx, |_, cx| {
398 worktree.update(cx, |worktree, _| proto::AddWorktreeResponse {
399 worktree_id: worktree.id().to_proto(),
400 canonicalized_path: canonicalized.to_proto(),
401 })
402 })?;
403
404 // We spawn this asynchronously, so that we can send the response back
405 // *before* `worktree_store.add()` can send out UpdateProject requests
406 // to the client about the new worktree.
407 //
408 // That lets the client manage the reference/handles of the newly-added
409 // worktree, before getting interrupted by an UpdateProject request.
410 //
411 // This fixes the problem of the client sending the AddWorktree request,
412 // headless project sending out a project update, client receiving it
413 // and immediately dropping the reference of the new client, causing it
414 // to be dropped on the headless project, and the client only then
415 // receiving a response to AddWorktree.
416 cx.spawn(async move |cx| {
417 this.update(cx, |this, cx| {
418 this.worktree_store.update(cx, |worktree_store, cx| {
419 worktree_store.add(&worktree, cx);
420 });
421 })
422 .log_err();
423 })
424 .detach();
425
426 Ok(response)
427 }
428
429 pub async fn handle_remove_worktree(
430 this: Entity<Self>,
431 envelope: TypedEnvelope<proto::RemoveWorktree>,
432 mut cx: AsyncApp,
433 ) -> Result<proto::Ack> {
434 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
435 this.update(&mut cx, |this, cx| {
436 this.worktree_store.update(cx, |worktree_store, cx| {
437 worktree_store.remove_worktree(worktree_id, cx);
438 });
439 })?;
440 Ok(proto::Ack {})
441 }
442
443 pub async fn handle_open_buffer_by_path(
444 this: Entity<Self>,
445 message: TypedEnvelope<proto::OpenBufferByPath>,
446 mut cx: AsyncApp,
447 ) -> Result<proto::OpenBufferResponse> {
448 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
449 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
450 let buffer_store = this.buffer_store.clone();
451 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
452 buffer_store.open_buffer(
453 ProjectPath {
454 worktree_id,
455 path: Arc::<Path>::from_proto(message.payload.path),
456 },
457 cx,
458 )
459 });
460 anyhow::Ok((buffer_store, buffer))
461 })??;
462
463 let buffer = buffer.await?;
464 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
465 buffer_store.update(&mut cx, |buffer_store, cx| {
466 buffer_store
467 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
468 .detach_and_log_err(cx);
469 })?;
470
471 Ok(proto::OpenBufferResponse {
472 buffer_id: buffer_id.to_proto(),
473 })
474 }
475
476 pub async fn handle_open_new_buffer(
477 this: Entity<Self>,
478 _message: TypedEnvelope<proto::OpenNewBuffer>,
479 mut cx: AsyncApp,
480 ) -> Result<proto::OpenBufferResponse> {
481 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
482 let buffer_store = this.buffer_store.clone();
483 let buffer = this
484 .buffer_store
485 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx));
486 anyhow::Ok((buffer_store, buffer))
487 })??;
488
489 let buffer = buffer.await?;
490 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
491 buffer_store.update(&mut cx, |buffer_store, cx| {
492 buffer_store
493 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
494 .detach_and_log_err(cx);
495 })?;
496
497 Ok(proto::OpenBufferResponse {
498 buffer_id: buffer_id.to_proto(),
499 })
500 }
501
502 pub async fn handle_open_server_settings(
503 this: Entity<Self>,
504 _: TypedEnvelope<proto::OpenServerSettings>,
505 mut cx: AsyncApp,
506 ) -> Result<proto::OpenBufferResponse> {
507 let settings_path = paths::settings_file();
508 let (worktree, path) = this
509 .update(&mut cx, |this, cx| {
510 this.worktree_store.update(cx, |worktree_store, cx| {
511 worktree_store.find_or_create_worktree(settings_path, false, cx)
512 })
513 })?
514 .await?;
515
516 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
517 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
518 buffer_store.open_buffer(
519 ProjectPath {
520 worktree_id: worktree.read(cx).id(),
521 path: path.into(),
522 },
523 cx,
524 )
525 });
526
527 (buffer, this.buffer_store.clone())
528 })?;
529
530 let buffer = buffer.await?;
531
532 let buffer_id = cx.update(|cx| {
533 if buffer.read(cx).is_empty() {
534 buffer.update(cx, |buffer, cx| {
535 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
536 });
537 }
538
539 let buffer_id = buffer.read_with(cx, |b, _| b.remote_id());
540
541 buffer_store.update(cx, |buffer_store, cx| {
542 buffer_store
543 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
544 .detach_and_log_err(cx);
545 });
546
547 buffer_id
548 })?;
549
550 Ok(proto::OpenBufferResponse {
551 buffer_id: buffer_id.to_proto(),
552 })
553 }
554
555 pub async fn handle_find_search_candidates(
556 this: Entity<Self>,
557 envelope: TypedEnvelope<proto::FindSearchCandidates>,
558 mut cx: AsyncApp,
559 ) -> Result<proto::FindSearchCandidatesResponse> {
560 let message = envelope.payload;
561 let query = SearchQuery::from_proto(message.query.context("missing query field")?)?;
562 let results = this.update(&mut cx, |this, cx| {
563 this.buffer_store.update(cx, |buffer_store, cx| {
564 buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
565 })
566 })?;
567
568 let mut response = proto::FindSearchCandidatesResponse {
569 buffer_ids: Vec::new(),
570 };
571
572 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
573
574 while let Ok(buffer) = results.recv().await {
575 let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
576 response.buffer_ids.push(buffer_id.to_proto());
577 buffer_store
578 .update(&mut cx, |buffer_store, cx| {
579 buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
580 })?
581 .await?;
582 }
583
584 Ok(response)
585 }
586
587 pub async fn handle_list_remote_directory(
588 this: Entity<Self>,
589 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
590 cx: AsyncApp,
591 ) -> Result<proto::ListRemoteDirectoryResponse> {
592 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
593 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
594 let check_info = envelope
595 .payload
596 .config
597 .as_ref()
598 .is_some_and(|config| config.is_dir);
599
600 let mut entries = Vec::new();
601 let mut entry_info = Vec::new();
602 let mut response = fs.read_dir(&expanded).await?;
603 while let Some(path) = response.next().await {
604 let path = path?;
605 if let Some(file_name) = path.file_name() {
606 entries.push(file_name.to_string_lossy().to_string());
607 if check_info {
608 let is_dir = fs.is_dir(&path).await;
609 entry_info.push(proto::EntryInfo { is_dir });
610 }
611 }
612 }
613 Ok(proto::ListRemoteDirectoryResponse {
614 entries,
615 entry_info,
616 })
617 }
618
619 pub async fn handle_get_path_metadata(
620 this: Entity<Self>,
621 envelope: TypedEnvelope<proto::GetPathMetadata>,
622 cx: AsyncApp,
623 ) -> Result<proto::GetPathMetadataResponse> {
624 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
625 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
626
627 let metadata = fs.metadata(&expanded).await?;
628 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
629
630 Ok(proto::GetPathMetadataResponse {
631 exists: metadata.is_some(),
632 is_dir,
633 path: expanded.to_proto(),
634 })
635 }
636
637 pub async fn handle_shutdown_remote_server(
638 _this: Entity<Self>,
639 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
640 cx: AsyncApp,
641 ) -> Result<proto::Ack> {
642 cx.spawn(async move |cx| {
643 cx.update(|cx| {
644 // TODO: This is a hack, because in a headless project, shutdown isn't executed
645 // when calling quit, but it should be.
646 cx.shutdown();
647 cx.quit();
648 })
649 })
650 .detach();
651
652 Ok(proto::Ack {})
653 }
654
655 pub async fn handle_ping(
656 _this: Entity<Self>,
657 _envelope: TypedEnvelope<proto::Ping>,
658 _cx: AsyncApp,
659 ) -> Result<proto::Ack> {
660 log::debug!("Received ping from client");
661 Ok(proto::Ack {})
662 }
663}
664
665fn prompt_to_proto(
666 prompt: &project::LanguageServerPromptRequest,
667) -> proto::language_server_prompt_request::Level {
668 match prompt.level {
669 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
670 proto::language_server_prompt_request::Info {},
671 ),
672 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
673 proto::language_server_prompt_request::Warning {},
674 ),
675 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
676 proto::language_server_prompt_request::Critical {},
677 ),
678 }
679}