1use ::proto::{FromProto, ToProto};
2use anyhow::{Context as _, Result, anyhow};
3
4use extension::ExtensionHostProxy;
5use extension_host::headless_host::HeadlessExtensionStore;
6use fs::Fs;
7use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
8use http_client::HttpClient;
9use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
10use node_runtime::NodeRuntime;
11use project::{
12 LspStore, LspStoreEvent, PrettierStore, ProjectEnvironment, ProjectPath, ToolchainStore,
13 WorktreeId,
14 buffer_store::{BufferStore, BufferStoreEvent},
15 debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
16 git_store::GitStore,
17 project_settings::SettingsObserver,
18 search::SearchQuery,
19 task_store::TaskStore,
20 worktree_store::WorktreeStore,
21};
22use remote::ssh_session::ChannelClient;
23use rpc::{
24 AnyProtoClient, TypedEnvelope,
25 proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
26};
27
28use settings::initial_server_settings_content;
29use smol::stream::StreamExt;
30use std::{
31 path::{Path, PathBuf},
32 sync::{Arc, atomic::AtomicUsize},
33};
34use util::ResultExt;
35use worktree::Worktree;
36
37pub struct HeadlessProject {
38 pub fs: Arc<dyn Fs>,
39 pub session: AnyProtoClient,
40 pub worktree_store: Entity<WorktreeStore>,
41 pub buffer_store: Entity<BufferStore>,
42 pub lsp_store: Entity<LspStore>,
43 pub task_store: Entity<TaskStore>,
44 pub dap_store: Entity<DapStore>,
45 pub settings_observer: Entity<SettingsObserver>,
46 pub next_entry_id: Arc<AtomicUsize>,
47 pub languages: Arc<LanguageRegistry>,
48 pub extensions: Entity<HeadlessExtensionStore>,
49 pub git_store: Entity<GitStore>,
50}
51
52pub struct HeadlessAppState {
53 pub session: Arc<ChannelClient>,
54 pub fs: Arc<dyn Fs>,
55 pub http_client: Arc<dyn HttpClient>,
56 pub node_runtime: NodeRuntime,
57 pub languages: Arc<LanguageRegistry>,
58 pub extension_host_proxy: Arc<ExtensionHostProxy>,
59}
60
61impl HeadlessProject {
62 pub fn init(cx: &mut App) {
63 settings::init(cx);
64 language::init(cx);
65 project::Project::init_settings(cx);
66 }
67
68 pub fn new(
69 HeadlessAppState {
70 session,
71 fs,
72 http_client,
73 node_runtime,
74 languages,
75 extension_host_proxy: proxy,
76 }: HeadlessAppState,
77 cx: &mut Context<Self>,
78 ) -> Self {
79 debug_adapter_extension::init(proxy.clone(), cx);
80 language_extension::init(proxy.clone(), languages.clone());
81 languages::init(languages.clone(), node_runtime.clone(), cx);
82
83 let worktree_store = cx.new(|cx| {
84 let mut store = WorktreeStore::local(true, fs.clone());
85 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
86 store
87 });
88
89 let environment = cx.new(|_| ProjectEnvironment::new(None));
90
91 let toolchain_store = cx.new(|cx| {
92 ToolchainStore::local(
93 languages.clone(),
94 worktree_store.clone(),
95 environment.clone(),
96 cx,
97 )
98 });
99
100 let buffer_store = cx.new(|cx| {
101 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
102 buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
103 buffer_store
104 });
105
106 let breakpoint_store =
107 cx.new(|_| BreakpointStore::local(worktree_store.clone(), buffer_store.clone()));
108
109 let dap_store = cx.new(|cx| {
110 let mut dap_store = DapStore::new_local(
111 http_client.clone(),
112 node_runtime.clone(),
113 fs.clone(),
114 environment.clone(),
115 toolchain_store.read(cx).as_language_toolchain_store(),
116 worktree_store.clone(),
117 breakpoint_store.clone(),
118 cx,
119 );
120 dap_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
121 dap_store
122 });
123
124 let git_store = cx.new(|cx| {
125 let mut store = GitStore::local(
126 &worktree_store,
127 buffer_store.clone(),
128 environment.clone(),
129 fs.clone(),
130 cx,
131 );
132 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
133 store
134 });
135
136 let prettier_store = cx.new(|cx| {
137 PrettierStore::new(
138 node_runtime.clone(),
139 fs.clone(),
140 languages.clone(),
141 worktree_store.clone(),
142 cx,
143 )
144 });
145
146 let task_store = cx.new(|cx| {
147 let mut task_store = TaskStore::local(
148 buffer_store.downgrade(),
149 worktree_store.clone(),
150 toolchain_store.read(cx).as_language_toolchain_store(),
151 environment.clone(),
152 cx,
153 );
154 task_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
155 task_store
156 });
157 let settings_observer = cx.new(|cx| {
158 let mut observer = SettingsObserver::new_local(
159 fs.clone(),
160 worktree_store.clone(),
161 task_store.clone(),
162 cx,
163 );
164 observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
165 observer
166 });
167
168 let lsp_store = cx.new(|cx| {
169 let mut lsp_store = LspStore::new_local(
170 buffer_store.clone(),
171 worktree_store.clone(),
172 prettier_store.clone(),
173 toolchain_store.clone(),
174 environment,
175 languages.clone(),
176 http_client.clone(),
177 fs.clone(),
178 cx,
179 );
180 lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
181 lsp_store
182 });
183
184 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
185
186 cx.subscribe(
187 &buffer_store,
188 |_this, _buffer_store, event, cx| match event {
189 BufferStoreEvent::BufferAdded(buffer) => {
190 cx.subscribe(buffer, Self::on_buffer_event).detach();
191 }
192 _ => {}
193 },
194 )
195 .detach();
196
197 let extensions = HeadlessExtensionStore::new(
198 fs.clone(),
199 http_client.clone(),
200 paths::remote_extensions_dir().to_path_buf(),
201 proxy,
202 node_runtime,
203 cx,
204 );
205
206 let client: AnyProtoClient = session.clone().into();
207
208 // local_machine -> ssh handlers
209 session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
210 session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
211 session.subscribe_to_entity(SSH_PROJECT_ID, &cx.entity());
212 session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
213 session.subscribe_to_entity(SSH_PROJECT_ID, &task_store);
214 session.subscribe_to_entity(SSH_PROJECT_ID, &toolchain_store);
215 session.subscribe_to_entity(SSH_PROJECT_ID, &dap_store);
216 session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
217 session.subscribe_to_entity(SSH_PROJECT_ID, &git_store);
218
219 client.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
220 client.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
221 client.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
222 client.add_request_handler(cx.weak_entity(), Self::handle_ping);
223
224 client.add_entity_request_handler(Self::handle_add_worktree);
225 client.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
226
227 client.add_entity_request_handler(Self::handle_open_buffer_by_path);
228 client.add_entity_request_handler(Self::handle_open_new_buffer);
229 client.add_entity_request_handler(Self::handle_find_search_candidates);
230 client.add_entity_request_handler(Self::handle_open_server_settings);
231
232 client.add_entity_request_handler(BufferStore::handle_update_buffer);
233 client.add_entity_message_handler(BufferStore::handle_close_buffer);
234
235 client.add_request_handler(
236 extensions.clone().downgrade(),
237 HeadlessExtensionStore::handle_sync_extensions,
238 );
239 client.add_request_handler(
240 extensions.clone().downgrade(),
241 HeadlessExtensionStore::handle_install_extension,
242 );
243
244 BufferStore::init(&client);
245 WorktreeStore::init(&client);
246 SettingsObserver::init(&client);
247 LspStore::init(&client);
248 TaskStore::init(Some(&client));
249 ToolchainStore::init(&client);
250 DapStore::init(&client, cx);
251 // todo(debugger): Re init breakpoint store when we set it up for collab
252 // BreakpointStore::init(&client);
253 GitStore::init(&client);
254
255 HeadlessProject {
256 session: client,
257 settings_observer,
258 fs,
259 worktree_store,
260 buffer_store,
261 lsp_store,
262 task_store,
263 dap_store,
264 next_entry_id: Default::default(),
265 languages,
266 extensions,
267 git_store,
268 }
269 }
270
271 fn on_buffer_event(
272 &mut self,
273 buffer: Entity<Buffer>,
274 event: &BufferEvent,
275 cx: &mut Context<Self>,
276 ) {
277 match event {
278 BufferEvent::Operation {
279 operation,
280 is_local: true,
281 } => cx
282 .background_spawn(self.session.request(proto::UpdateBuffer {
283 project_id: SSH_PROJECT_ID,
284 buffer_id: buffer.read(cx).remote_id().to_proto(),
285 operations: vec![serialize_operation(operation)],
286 }))
287 .detach(),
288 _ => {}
289 }
290 }
291
292 fn on_lsp_store_event(
293 &mut self,
294 _lsp_store: Entity<LspStore>,
295 event: &LspStoreEvent,
296 cx: &mut Context<Self>,
297 ) {
298 match event {
299 LspStoreEvent::LanguageServerUpdate {
300 language_server_id,
301 message,
302 } => {
303 self.session
304 .send(proto::UpdateLanguageServer {
305 project_id: SSH_PROJECT_ID,
306 language_server_id: language_server_id.to_proto(),
307 variant: Some(message.clone()),
308 })
309 .log_err();
310 }
311 LspStoreEvent::Notification(message) => {
312 self.session
313 .send(proto::Toast {
314 project_id: SSH_PROJECT_ID,
315 notification_id: "lsp".to_string(),
316 message: message.clone(),
317 })
318 .log_err();
319 }
320 LspStoreEvent::LanguageServerLog(language_server_id, log_type, message) => {
321 self.session
322 .send(proto::LanguageServerLog {
323 project_id: SSH_PROJECT_ID,
324 language_server_id: language_server_id.to_proto(),
325 message: message.clone(),
326 log_type: Some(log_type.to_proto()),
327 })
328 .log_err();
329 }
330 LspStoreEvent::LanguageServerPrompt(prompt) => {
331 let request = self.session.request(proto::LanguageServerPromptRequest {
332 project_id: SSH_PROJECT_ID,
333 actions: prompt
334 .actions
335 .iter()
336 .map(|action| action.title.to_string())
337 .collect(),
338 level: Some(prompt_to_proto(&prompt)),
339 lsp_name: prompt.lsp_name.clone(),
340 message: prompt.message.clone(),
341 });
342 let prompt = prompt.clone();
343 cx.background_spawn(async move {
344 let response = request.await?;
345 if let Some(action_response) = response.action_response {
346 prompt.respond(action_response as usize).await;
347 }
348 anyhow::Ok(())
349 })
350 .detach();
351 }
352 _ => {}
353 }
354 }
355
356 pub async fn handle_add_worktree(
357 this: Entity<Self>,
358 message: TypedEnvelope<proto::AddWorktree>,
359 mut cx: AsyncApp,
360 ) -> Result<proto::AddWorktreeResponse> {
361 use client::ErrorCodeExt;
362 let fs = this.read_with(&mut cx, |this, _| this.fs.clone())?;
363 let path = PathBuf::from_proto(shellexpand::tilde(&message.payload.path).to_string());
364
365 let canonicalized = match fs.canonicalize(&path).await {
366 Ok(path) => path,
367 Err(e) => {
368 let mut parent = path
369 .parent()
370 .ok_or(e)
371 .with_context(|| format!("{path:?} does not exist"))?;
372 if parent == Path::new("") {
373 parent = util::paths::home_dir();
374 }
375 let parent = fs.canonicalize(parent).await.map_err(|_| {
376 anyhow!(
377 proto::ErrorCode::DevServerProjectPathDoesNotExist
378 .with_tag("path", &path.to_string_lossy().as_ref())
379 )
380 })?;
381 parent.join(path.file_name().unwrap())
382 }
383 };
384
385 let worktree = this
386 .read_with(&mut cx.clone(), |this, _| {
387 Worktree::local(
388 Arc::from(canonicalized.as_path()),
389 message.payload.visible,
390 this.fs.clone(),
391 this.next_entry_id.clone(),
392 &mut cx,
393 )
394 })?
395 .await?;
396
397 let response = this.read_with(&mut cx, |_, cx| {
398 let worktree = worktree.read(cx);
399 proto::AddWorktreeResponse {
400 worktree_id: worktree.id().to_proto(),
401 canonicalized_path: canonicalized.to_proto(),
402 }
403 })?;
404
405 // We spawn this asynchronously, so that we can send the response back
406 // *before* `worktree_store.add()` can send out UpdateProject requests
407 // to the client about the new worktree.
408 //
409 // That lets the client manage the reference/handles of the newly-added
410 // worktree, before getting interrupted by an UpdateProject request.
411 //
412 // This fixes the problem of the client sending the AddWorktree request,
413 // headless project sending out a project update, client receiving it
414 // and immediately dropping the reference of the new client, causing it
415 // to be dropped on the headless project, and the client only then
416 // receiving a response to AddWorktree.
417 cx.spawn(async move |cx| {
418 this.update(cx, |this, cx| {
419 this.worktree_store.update(cx, |worktree_store, cx| {
420 worktree_store.add(&worktree, cx);
421 });
422 })
423 .log_err();
424 })
425 .detach();
426
427 Ok(response)
428 }
429
430 pub async fn handle_remove_worktree(
431 this: Entity<Self>,
432 envelope: TypedEnvelope<proto::RemoveWorktree>,
433 mut cx: AsyncApp,
434 ) -> Result<proto::Ack> {
435 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
436 this.update(&mut cx, |this, cx| {
437 this.worktree_store.update(cx, |worktree_store, cx| {
438 worktree_store.remove_worktree(worktree_id, cx);
439 });
440 })?;
441 Ok(proto::Ack {})
442 }
443
444 pub async fn handle_open_buffer_by_path(
445 this: Entity<Self>,
446 message: TypedEnvelope<proto::OpenBufferByPath>,
447 mut cx: AsyncApp,
448 ) -> Result<proto::OpenBufferResponse> {
449 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
450 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
451 let buffer_store = this.buffer_store.clone();
452 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
453 buffer_store.open_buffer(
454 ProjectPath {
455 worktree_id,
456 path: Arc::<Path>::from_proto(message.payload.path),
457 },
458 cx,
459 )
460 });
461 anyhow::Ok((buffer_store, buffer))
462 })??;
463
464 let buffer = buffer.await?;
465 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
466 buffer_store.update(&mut cx, |buffer_store, cx| {
467 buffer_store
468 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
469 .detach_and_log_err(cx);
470 })?;
471
472 Ok(proto::OpenBufferResponse {
473 buffer_id: buffer_id.to_proto(),
474 })
475 }
476
477 pub async fn handle_open_new_buffer(
478 this: Entity<Self>,
479 _message: TypedEnvelope<proto::OpenNewBuffer>,
480 mut cx: AsyncApp,
481 ) -> Result<proto::OpenBufferResponse> {
482 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
483 let buffer_store = this.buffer_store.clone();
484 let buffer = this
485 .buffer_store
486 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx));
487 anyhow::Ok((buffer_store, buffer))
488 })??;
489
490 let buffer = buffer.await?;
491 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
492 buffer_store.update(&mut cx, |buffer_store, cx| {
493 buffer_store
494 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
495 .detach_and_log_err(cx);
496 })?;
497
498 Ok(proto::OpenBufferResponse {
499 buffer_id: buffer_id.to_proto(),
500 })
501 }
502
503 pub async fn handle_open_server_settings(
504 this: Entity<Self>,
505 _: TypedEnvelope<proto::OpenServerSettings>,
506 mut cx: AsyncApp,
507 ) -> Result<proto::OpenBufferResponse> {
508 let settings_path = paths::settings_file();
509 let (worktree, path) = this
510 .update(&mut cx, |this, cx| {
511 this.worktree_store.update(cx, |worktree_store, cx| {
512 worktree_store.find_or_create_worktree(settings_path, false, cx)
513 })
514 })?
515 .await?;
516
517 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
518 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
519 buffer_store.open_buffer(
520 ProjectPath {
521 worktree_id: worktree.read(cx).id(),
522 path: path.into(),
523 },
524 cx,
525 )
526 });
527
528 (buffer, this.buffer_store.clone())
529 })?;
530
531 let buffer = buffer.await?;
532
533 let buffer_id = cx.update(|cx| {
534 if buffer.read(cx).is_empty() {
535 buffer.update(cx, |buffer, cx| {
536 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
537 });
538 }
539
540 let buffer_id = buffer.read(cx).remote_id();
541
542 buffer_store.update(cx, |buffer_store, cx| {
543 buffer_store
544 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
545 .detach_and_log_err(cx);
546 });
547
548 buffer_id
549 })?;
550
551 Ok(proto::OpenBufferResponse {
552 buffer_id: buffer_id.to_proto(),
553 })
554 }
555
556 pub async fn handle_find_search_candidates(
557 this: Entity<Self>,
558 envelope: TypedEnvelope<proto::FindSearchCandidates>,
559 mut cx: AsyncApp,
560 ) -> Result<proto::FindSearchCandidatesResponse> {
561 let message = envelope.payload;
562 let query = SearchQuery::from_proto(message.query.context("missing query field")?)?;
563 let results = this.update(&mut cx, |this, cx| {
564 this.buffer_store.update(cx, |buffer_store, cx| {
565 buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
566 })
567 })?;
568
569 let mut response = proto::FindSearchCandidatesResponse {
570 buffer_ids: Vec::new(),
571 };
572
573 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
574
575 while let Ok(buffer) = results.recv().await {
576 let buffer_id = buffer.read_with(&mut cx, |this, _| this.remote_id())?;
577 response.buffer_ids.push(buffer_id.to_proto());
578 buffer_store
579 .update(&mut cx, |buffer_store, cx| {
580 buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
581 })?
582 .await?;
583 }
584
585 Ok(response)
586 }
587
588 pub async fn handle_list_remote_directory(
589 this: Entity<Self>,
590 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
591 cx: AsyncApp,
592 ) -> Result<proto::ListRemoteDirectoryResponse> {
593 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
594 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
595 let check_info = envelope
596 .payload
597 .config
598 .as_ref()
599 .is_some_and(|config| config.is_dir);
600
601 let mut entries = Vec::new();
602 let mut entry_info = Vec::new();
603 let mut response = fs.read_dir(&expanded).await?;
604 while let Some(path) = response.next().await {
605 let path = path?;
606 if let Some(file_name) = path.file_name() {
607 entries.push(file_name.to_string_lossy().to_string());
608 if check_info {
609 let is_dir = fs.is_dir(&path).await;
610 entry_info.push(proto::EntryInfo { is_dir });
611 }
612 }
613 }
614 Ok(proto::ListRemoteDirectoryResponse {
615 entries,
616 entry_info,
617 })
618 }
619
620 pub async fn handle_get_path_metadata(
621 this: Entity<Self>,
622 envelope: TypedEnvelope<proto::GetPathMetadata>,
623 cx: AsyncApp,
624 ) -> Result<proto::GetPathMetadataResponse> {
625 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
626 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
627
628 let metadata = fs.metadata(&expanded).await?;
629 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
630
631 Ok(proto::GetPathMetadataResponse {
632 exists: metadata.is_some(),
633 is_dir,
634 path: expanded.to_proto(),
635 })
636 }
637
638 pub async fn handle_shutdown_remote_server(
639 _this: Entity<Self>,
640 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
641 cx: AsyncApp,
642 ) -> Result<proto::Ack> {
643 cx.spawn(async move |cx| {
644 cx.update(|cx| {
645 // TODO: This is a hack, because in a headless project, shutdown isn't executed
646 // when calling quit, but it should be.
647 cx.shutdown();
648 cx.quit();
649 })
650 })
651 .detach();
652
653 Ok(proto::Ack {})
654 }
655
656 pub async fn handle_ping(
657 _this: Entity<Self>,
658 _envelope: TypedEnvelope<proto::Ping>,
659 _cx: AsyncApp,
660 ) -> Result<proto::Ack> {
661 log::debug!("Received ping from client");
662 Ok(proto::Ack {})
663 }
664}
665
666fn prompt_to_proto(
667 prompt: &project::LanguageServerPromptRequest,
668) -> proto::language_server_prompt_request::Level {
669 match prompt.level {
670 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
671 proto::language_server_prompt_request::Info {},
672 ),
673 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
674 proto::language_server_prompt_request::Warning {},
675 ),
676 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
677 proto::language_server_prompt_request::Critical {},
678 ),
679 }
680}