1use ::proto::{FromProto, ToProto};
2use anyhow::{Result, anyhow};
3use dap::DapRegistry;
4use extension::ExtensionHostProxy;
5use extension_host::headless_host::HeadlessExtensionStore;
6use fs::Fs;
7use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
8use http_client::HttpClient;
9use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
10use node_runtime::NodeRuntime;
11use project::{
12 LspStore, LspStoreEvent, PrettierStore, ProjectEnvironment, ProjectPath, ToolchainStore,
13 WorktreeId,
14 buffer_store::{BufferStore, BufferStoreEvent},
15 debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
16 git_store::GitStore,
17 project_settings::SettingsObserver,
18 search::SearchQuery,
19 task_store::TaskStore,
20 worktree_store::WorktreeStore,
21};
22use remote::ssh_session::ChannelClient;
23use rpc::{
24 AnyProtoClient, TypedEnvelope,
25 proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
26};
27
28use settings::initial_server_settings_content;
29use smol::stream::StreamExt;
30use std::{
31 path::{Path, PathBuf},
32 sync::{Arc, atomic::AtomicUsize},
33};
34use util::ResultExt;
35use worktree::Worktree;
36
37pub struct HeadlessProject {
38 pub fs: Arc<dyn Fs>,
39 pub session: AnyProtoClient,
40 pub worktree_store: Entity<WorktreeStore>,
41 pub buffer_store: Entity<BufferStore>,
42 pub lsp_store: Entity<LspStore>,
43 pub task_store: Entity<TaskStore>,
44 pub settings_observer: Entity<SettingsObserver>,
45 pub next_entry_id: Arc<AtomicUsize>,
46 pub languages: Arc<LanguageRegistry>,
47 pub extensions: Entity<HeadlessExtensionStore>,
48 pub git_store: Entity<GitStore>,
49}
50
51pub struct HeadlessAppState {
52 pub session: Arc<ChannelClient>,
53 pub fs: Arc<dyn Fs>,
54 pub http_client: Arc<dyn HttpClient>,
55 pub node_runtime: NodeRuntime,
56 pub languages: Arc<LanguageRegistry>,
57 pub debug_adapters: Arc<DapRegistry>,
58 pub extension_host_proxy: Arc<ExtensionHostProxy>,
59}
60
61impl HeadlessProject {
62 pub fn init(cx: &mut App) {
63 settings::init(cx);
64 language::init(cx);
65 project::Project::init_settings(cx);
66 }
67
68 pub fn new(
69 HeadlessAppState {
70 session,
71 fs,
72 http_client,
73 node_runtime,
74 languages,
75 debug_adapters: _debug_adapters,
76 extension_host_proxy: proxy,
77 }: HeadlessAppState,
78 cx: &mut Context<Self>,
79 ) -> Self {
80 language_extension::init(proxy.clone(), languages.clone());
81 languages::init(languages.clone(), node_runtime.clone(), cx);
82
83 let worktree_store = cx.new(|cx| {
84 let mut store = WorktreeStore::local(true, fs.clone());
85 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
86 store
87 });
88
89 let environment = cx.new(|_| ProjectEnvironment::new(None));
90
91 let toolchain_store = cx.new(|cx| {
92 ToolchainStore::local(
93 languages.clone(),
94 worktree_store.clone(),
95 environment.clone(),
96 cx,
97 )
98 });
99
100 let buffer_store = cx.new(|cx| {
101 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
102 buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
103 buffer_store
104 });
105
106 let breakpoint_store =
107 cx.new(|_| BreakpointStore::local(worktree_store.clone(), buffer_store.clone()));
108
109 let dap_store = cx.new(|cx| {
110 DapStore::new_local(
111 http_client.clone(),
112 node_runtime.clone(),
113 fs.clone(),
114 languages.clone(),
115 environment.clone(),
116 toolchain_store.read(cx).as_language_toolchain_store(),
117 breakpoint_store.clone(),
118 cx,
119 )
120 });
121
122 let git_store = cx.new(|cx| {
123 let mut store = GitStore::local(
124 &worktree_store,
125 buffer_store.clone(),
126 environment.clone(),
127 fs.clone(),
128 cx,
129 );
130 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
131 store
132 });
133
134 let prettier_store = cx.new(|cx| {
135 PrettierStore::new(
136 node_runtime.clone(),
137 fs.clone(),
138 languages.clone(),
139 worktree_store.clone(),
140 cx,
141 )
142 });
143
144 let task_store = cx.new(|cx| {
145 let mut task_store = TaskStore::local(
146 buffer_store.downgrade(),
147 worktree_store.clone(),
148 toolchain_store.read(cx).as_language_toolchain_store(),
149 environment.clone(),
150 cx,
151 );
152 task_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
153 task_store
154 });
155 let settings_observer = cx.new(|cx| {
156 let mut observer = SettingsObserver::new_local(
157 fs.clone(),
158 worktree_store.clone(),
159 task_store.clone(),
160 cx,
161 );
162 observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
163 observer
164 });
165
166 let lsp_store = cx.new(|cx| {
167 let mut lsp_store = LspStore::new_local(
168 buffer_store.clone(),
169 worktree_store.clone(),
170 prettier_store.clone(),
171 toolchain_store.clone(),
172 environment,
173 languages.clone(),
174 http_client.clone(),
175 fs.clone(),
176 cx,
177 );
178 lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
179 lsp_store
180 });
181
182 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
183
184 cx.subscribe(
185 &buffer_store,
186 |_this, _buffer_store, event, cx| match event {
187 BufferStoreEvent::BufferAdded(buffer) => {
188 cx.subscribe(buffer, Self::on_buffer_event).detach();
189 }
190 _ => {}
191 },
192 )
193 .detach();
194
195 let extensions = HeadlessExtensionStore::new(
196 fs.clone(),
197 http_client.clone(),
198 paths::remote_extensions_dir().to_path_buf(),
199 proxy,
200 node_runtime,
201 cx,
202 );
203
204 let client: AnyProtoClient = session.clone().into();
205
206 // local_machine -> ssh handlers
207 session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
208 session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
209 session.subscribe_to_entity(SSH_PROJECT_ID, &cx.entity());
210 session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
211 session.subscribe_to_entity(SSH_PROJECT_ID, &task_store);
212 session.subscribe_to_entity(SSH_PROJECT_ID, &toolchain_store);
213 session.subscribe_to_entity(SSH_PROJECT_ID, &dap_store);
214 session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
215 session.subscribe_to_entity(SSH_PROJECT_ID, &git_store);
216
217 client.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
218 client.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
219 client.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
220 client.add_request_handler(cx.weak_entity(), Self::handle_ping);
221
222 client.add_entity_request_handler(Self::handle_add_worktree);
223 client.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
224
225 client.add_entity_request_handler(Self::handle_open_buffer_by_path);
226 client.add_entity_request_handler(Self::handle_open_new_buffer);
227 client.add_entity_request_handler(Self::handle_find_search_candidates);
228 client.add_entity_request_handler(Self::handle_open_server_settings);
229
230 client.add_entity_request_handler(BufferStore::handle_update_buffer);
231 client.add_entity_message_handler(BufferStore::handle_close_buffer);
232
233 client.add_request_handler(
234 extensions.clone().downgrade(),
235 HeadlessExtensionStore::handle_sync_extensions,
236 );
237 client.add_request_handler(
238 extensions.clone().downgrade(),
239 HeadlessExtensionStore::handle_install_extension,
240 );
241
242 BufferStore::init(&client);
243 WorktreeStore::init(&client);
244 SettingsObserver::init(&client);
245 LspStore::init(&client);
246 TaskStore::init(Some(&client));
247 ToolchainStore::init(&client);
248 DapStore::init(&client);
249 // todo(debugger): Re init breakpoint store when we set it up for collab
250 // BreakpointStore::init(&client);
251 GitStore::init(&client);
252
253 HeadlessProject {
254 session: client,
255 settings_observer,
256 fs,
257 worktree_store,
258 buffer_store,
259 lsp_store,
260 task_store,
261 next_entry_id: Default::default(),
262 languages,
263 extensions,
264 git_store,
265 }
266 }
267
268 fn on_buffer_event(
269 &mut self,
270 buffer: Entity<Buffer>,
271 event: &BufferEvent,
272 cx: &mut Context<Self>,
273 ) {
274 match event {
275 BufferEvent::Operation {
276 operation,
277 is_local: true,
278 } => cx
279 .background_spawn(self.session.request(proto::UpdateBuffer {
280 project_id: SSH_PROJECT_ID,
281 buffer_id: buffer.read(cx).remote_id().to_proto(),
282 operations: vec![serialize_operation(operation)],
283 }))
284 .detach(),
285 _ => {}
286 }
287 }
288
289 fn on_lsp_store_event(
290 &mut self,
291 _lsp_store: Entity<LspStore>,
292 event: &LspStoreEvent,
293 cx: &mut Context<Self>,
294 ) {
295 match event {
296 LspStoreEvent::LanguageServerUpdate {
297 language_server_id,
298 message,
299 } => {
300 self.session
301 .send(proto::UpdateLanguageServer {
302 project_id: SSH_PROJECT_ID,
303 language_server_id: language_server_id.to_proto(),
304 variant: Some(message.clone()),
305 })
306 .log_err();
307 }
308 LspStoreEvent::Notification(message) => {
309 self.session
310 .send(proto::Toast {
311 project_id: SSH_PROJECT_ID,
312 notification_id: "lsp".to_string(),
313 message: message.clone(),
314 })
315 .log_err();
316 }
317 LspStoreEvent::LanguageServerLog(language_server_id, log_type, message) => {
318 self.session
319 .send(proto::LanguageServerLog {
320 project_id: SSH_PROJECT_ID,
321 language_server_id: language_server_id.to_proto(),
322 message: message.clone(),
323 log_type: Some(log_type.to_proto()),
324 })
325 .log_err();
326 }
327 LspStoreEvent::LanguageServerPrompt(prompt) => {
328 let request = self.session.request(proto::LanguageServerPromptRequest {
329 project_id: SSH_PROJECT_ID,
330 actions: prompt
331 .actions
332 .iter()
333 .map(|action| action.title.to_string())
334 .collect(),
335 level: Some(prompt_to_proto(&prompt)),
336 lsp_name: prompt.lsp_name.clone(),
337 message: prompt.message.clone(),
338 });
339 let prompt = prompt.clone();
340 cx.background_spawn(async move {
341 let response = request.await?;
342 if let Some(action_response) = response.action_response {
343 prompt.respond(action_response as usize).await;
344 }
345 anyhow::Ok(())
346 })
347 .detach();
348 }
349 _ => {}
350 }
351 }
352
353 pub async fn handle_add_worktree(
354 this: Entity<Self>,
355 message: TypedEnvelope<proto::AddWorktree>,
356 mut cx: AsyncApp,
357 ) -> Result<proto::AddWorktreeResponse> {
358 use client::ErrorCodeExt;
359 let fs = this.read_with(&mut cx, |this, _| this.fs.clone())?;
360 let path = PathBuf::from_proto(shellexpand::tilde(&message.payload.path).to_string());
361
362 let canonicalized = match fs.canonicalize(&path).await {
363 Ok(path) => path,
364 Err(e) => {
365 let mut parent = path
366 .parent()
367 .ok_or(e)
368 .map_err(|_| anyhow!("{:?} does not exist", path))?;
369 if parent == Path::new("") {
370 parent = util::paths::home_dir();
371 }
372 let parent = fs.canonicalize(parent).await.map_err(|_| {
373 anyhow!(
374 proto::ErrorCode::DevServerProjectPathDoesNotExist
375 .with_tag("path", &path.to_string_lossy().as_ref())
376 )
377 })?;
378 parent.join(path.file_name().unwrap())
379 }
380 };
381
382 let worktree = this
383 .update(&mut cx.clone(), |this, _| {
384 Worktree::local(
385 Arc::from(canonicalized.as_path()),
386 message.payload.visible,
387 this.fs.clone(),
388 this.next_entry_id.clone(),
389 &mut cx,
390 )
391 })?
392 .await?;
393
394 let response = this.update(&mut cx, |_, cx| {
395 worktree.update(cx, |worktree, _| proto::AddWorktreeResponse {
396 worktree_id: worktree.id().to_proto(),
397 canonicalized_path: canonicalized.to_proto(),
398 })
399 })?;
400
401 // We spawn this asynchronously, so that we can send the response back
402 // *before* `worktree_store.add()` can send out UpdateProject requests
403 // to the client about the new worktree.
404 //
405 // That lets the client manage the reference/handles of the newly-added
406 // worktree, before getting interrupted by an UpdateProject request.
407 //
408 // This fixes the problem of the client sending the AddWorktree request,
409 // headless project sending out a project update, client receiving it
410 // and immediately dropping the reference of the new client, causing it
411 // to be dropped on the headless project, and the client only then
412 // receiving a response to AddWorktree.
413 cx.spawn(async move |cx| {
414 this.update(cx, |this, cx| {
415 this.worktree_store.update(cx, |worktree_store, cx| {
416 worktree_store.add(&worktree, cx);
417 });
418 })
419 .log_err();
420 })
421 .detach();
422
423 Ok(response)
424 }
425
426 pub async fn handle_remove_worktree(
427 this: Entity<Self>,
428 envelope: TypedEnvelope<proto::RemoveWorktree>,
429 mut cx: AsyncApp,
430 ) -> Result<proto::Ack> {
431 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
432 this.update(&mut cx, |this, cx| {
433 this.worktree_store.update(cx, |worktree_store, cx| {
434 worktree_store.remove_worktree(worktree_id, cx);
435 });
436 })?;
437 Ok(proto::Ack {})
438 }
439
440 pub async fn handle_open_buffer_by_path(
441 this: Entity<Self>,
442 message: TypedEnvelope<proto::OpenBufferByPath>,
443 mut cx: AsyncApp,
444 ) -> Result<proto::OpenBufferResponse> {
445 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
446 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
447 let buffer_store = this.buffer_store.clone();
448 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
449 buffer_store.open_buffer(
450 ProjectPath {
451 worktree_id,
452 path: Arc::<Path>::from_proto(message.payload.path),
453 },
454 cx,
455 )
456 });
457 anyhow::Ok((buffer_store, buffer))
458 })??;
459
460 let buffer = buffer.await?;
461 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
462 buffer_store.update(&mut cx, |buffer_store, cx| {
463 buffer_store
464 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
465 .detach_and_log_err(cx);
466 })?;
467
468 Ok(proto::OpenBufferResponse {
469 buffer_id: buffer_id.to_proto(),
470 })
471 }
472
473 pub async fn handle_open_new_buffer(
474 this: Entity<Self>,
475 _message: TypedEnvelope<proto::OpenNewBuffer>,
476 mut cx: AsyncApp,
477 ) -> Result<proto::OpenBufferResponse> {
478 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
479 let buffer_store = this.buffer_store.clone();
480 let buffer = this
481 .buffer_store
482 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx));
483 anyhow::Ok((buffer_store, buffer))
484 })??;
485
486 let buffer = buffer.await?;
487 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
488 buffer_store.update(&mut cx, |buffer_store, cx| {
489 buffer_store
490 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
491 .detach_and_log_err(cx);
492 })?;
493
494 Ok(proto::OpenBufferResponse {
495 buffer_id: buffer_id.to_proto(),
496 })
497 }
498
499 pub async fn handle_open_server_settings(
500 this: Entity<Self>,
501 _: TypedEnvelope<proto::OpenServerSettings>,
502 mut cx: AsyncApp,
503 ) -> Result<proto::OpenBufferResponse> {
504 let settings_path = paths::settings_file();
505 let (worktree, path) = this
506 .update(&mut cx, |this, cx| {
507 this.worktree_store.update(cx, |worktree_store, cx| {
508 worktree_store.find_or_create_worktree(settings_path, false, cx)
509 })
510 })?
511 .await?;
512
513 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
514 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
515 buffer_store.open_buffer(
516 ProjectPath {
517 worktree_id: worktree.read(cx).id(),
518 path: path.into(),
519 },
520 cx,
521 )
522 });
523
524 (buffer, this.buffer_store.clone())
525 })?;
526
527 let buffer = buffer.await?;
528
529 let buffer_id = cx.update(|cx| {
530 if buffer.read(cx).is_empty() {
531 buffer.update(cx, |buffer, cx| {
532 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
533 });
534 }
535
536 let buffer_id = buffer.read_with(cx, |b, _| b.remote_id());
537
538 buffer_store.update(cx, |buffer_store, cx| {
539 buffer_store
540 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
541 .detach_and_log_err(cx);
542 });
543
544 buffer_id
545 })?;
546
547 Ok(proto::OpenBufferResponse {
548 buffer_id: buffer_id.to_proto(),
549 })
550 }
551
552 pub async fn handle_find_search_candidates(
553 this: Entity<Self>,
554 envelope: TypedEnvelope<proto::FindSearchCandidates>,
555 mut cx: AsyncApp,
556 ) -> Result<proto::FindSearchCandidatesResponse> {
557 let message = envelope.payload;
558 let query = SearchQuery::from_proto(
559 message
560 .query
561 .ok_or_else(|| anyhow!("missing query field"))?,
562 )?;
563 let results = this.update(&mut cx, |this, cx| {
564 this.buffer_store.update(cx, |buffer_store, cx| {
565 buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
566 })
567 })?;
568
569 let mut response = proto::FindSearchCandidatesResponse {
570 buffer_ids: Vec::new(),
571 };
572
573 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
574
575 while let Ok(buffer) = results.recv().await {
576 let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
577 response.buffer_ids.push(buffer_id.to_proto());
578 buffer_store
579 .update(&mut cx, |buffer_store, cx| {
580 buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
581 })?
582 .await?;
583 }
584
585 Ok(response)
586 }
587
588 pub async fn handle_list_remote_directory(
589 this: Entity<Self>,
590 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
591 cx: AsyncApp,
592 ) -> Result<proto::ListRemoteDirectoryResponse> {
593 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
594 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
595 let check_info = envelope
596 .payload
597 .config
598 .as_ref()
599 .is_some_and(|config| config.is_dir);
600
601 let mut entries = Vec::new();
602 let mut entry_info = Vec::new();
603 let mut response = fs.read_dir(&expanded).await?;
604 while let Some(path) = response.next().await {
605 let path = path?;
606 if let Some(file_name) = path.file_name() {
607 entries.push(file_name.to_string_lossy().to_string());
608 if check_info {
609 let is_dir = fs.is_dir(&path).await;
610 entry_info.push(proto::EntryInfo { is_dir });
611 }
612 }
613 }
614 Ok(proto::ListRemoteDirectoryResponse {
615 entries,
616 entry_info,
617 })
618 }
619
620 pub async fn handle_get_path_metadata(
621 this: Entity<Self>,
622 envelope: TypedEnvelope<proto::GetPathMetadata>,
623 cx: AsyncApp,
624 ) -> Result<proto::GetPathMetadataResponse> {
625 let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
626 let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
627
628 let metadata = fs.metadata(&expanded).await?;
629 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
630
631 Ok(proto::GetPathMetadataResponse {
632 exists: metadata.is_some(),
633 is_dir,
634 path: expanded.to_proto(),
635 })
636 }
637
638 pub async fn handle_shutdown_remote_server(
639 _this: Entity<Self>,
640 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
641 cx: AsyncApp,
642 ) -> Result<proto::Ack> {
643 cx.spawn(async move |cx| {
644 cx.update(|cx| {
645 // TODO: This is a hack, because in a headless project, shutdown isn't executed
646 // when calling quit, but it should be.
647 cx.shutdown();
648 cx.quit();
649 })
650 })
651 .detach();
652
653 Ok(proto::Ack {})
654 }
655
656 pub async fn handle_ping(
657 _this: Entity<Self>,
658 _envelope: TypedEnvelope<proto::Ping>,
659 _cx: AsyncApp,
660 ) -> Result<proto::Ack> {
661 log::debug!("Received ping from client");
662 Ok(proto::Ack {})
663 }
664}
665
666fn prompt_to_proto(
667 prompt: &project::LanguageServerPromptRequest,
668) -> proto::language_server_prompt_request::Level {
669 match prompt.level {
670 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
671 proto::language_server_prompt_request::Info {},
672 ),
673 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
674 proto::language_server_prompt_request::Warning {},
675 ),
676 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
677 proto::language_server_prompt_request::Critical {},
678 ),
679 }
680}