1use anyhow::{anyhow, Result};
2use extension::ExtensionHostProxy;
3use extension_host::headless_host::HeadlessExtensionStore;
4use fs::Fs;
5use gpui::{AppContext, AsyncAppContext, Context as _, Model, ModelContext, PromptLevel};
6use http_client::HttpClient;
7use language::{proto::serialize_operation, Buffer, BufferEvent, LanguageRegistry};
8use node_runtime::NodeRuntime;
9use project::{
10 buffer_store::{BufferStore, BufferStoreEvent},
11 project_settings::SettingsObserver,
12 search::SearchQuery,
13 task_store::TaskStore,
14 worktree_store::WorktreeStore,
15 LspStore, LspStoreEvent, PrettierStore, ProjectPath, ToolchainStore, WorktreeId,
16};
17use remote::ssh_session::ChannelClient;
18use rpc::{
19 proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
20 AnyProtoClient, TypedEnvelope,
21};
22
23use settings::initial_server_settings_content;
24use smol::stream::StreamExt;
25use std::{
26 path::{Path, PathBuf},
27 sync::{atomic::AtomicUsize, Arc},
28};
29use util::ResultExt;
30use worktree::Worktree;
31
32pub struct HeadlessProject {
33 pub fs: Arc<dyn Fs>,
34 pub session: AnyProtoClient,
35 pub worktree_store: Model<WorktreeStore>,
36 pub buffer_store: Model<BufferStore>,
37 pub lsp_store: Model<LspStore>,
38 pub task_store: Model<TaskStore>,
39 pub settings_observer: Model<SettingsObserver>,
40 pub next_entry_id: Arc<AtomicUsize>,
41 pub languages: Arc<LanguageRegistry>,
42 pub extensions: Model<HeadlessExtensionStore>,
43}
44
45pub struct HeadlessAppState {
46 pub session: Arc<ChannelClient>,
47 pub fs: Arc<dyn Fs>,
48 pub http_client: Arc<dyn HttpClient>,
49 pub node_runtime: NodeRuntime,
50 pub languages: Arc<LanguageRegistry>,
51 pub extension_host_proxy: Arc<ExtensionHostProxy>,
52}
53
54impl HeadlessProject {
55 pub fn init(cx: &mut AppContext) {
56 settings::init(cx);
57 language::init(cx);
58 project::Project::init_settings(cx);
59 }
60
61 pub fn new(
62 HeadlessAppState {
63 session,
64 fs,
65 http_client,
66 node_runtime,
67 languages,
68 extension_host_proxy: proxy,
69 }: HeadlessAppState,
70 cx: &mut ModelContext<Self>,
71 ) -> Self {
72 language_extension::init(proxy.clone(), languages.clone());
73 languages::init(languages.clone(), node_runtime.clone(), cx);
74
75 let worktree_store = cx.new_model(|cx| {
76 let mut store = WorktreeStore::local(true, fs.clone());
77 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
78 store
79 });
80 let buffer_store = cx.new_model(|cx| {
81 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
82 buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
83 buffer_store
84 });
85 let prettier_store = cx.new_model(|cx| {
86 PrettierStore::new(
87 node_runtime.clone(),
88 fs.clone(),
89 languages.clone(),
90 worktree_store.clone(),
91 cx,
92 )
93 });
94 let environment = project::ProjectEnvironment::new(&worktree_store, None, cx);
95 let toolchain_store = cx.new_model(|cx| {
96 ToolchainStore::local(
97 languages.clone(),
98 worktree_store.clone(),
99 environment.clone(),
100 cx,
101 )
102 });
103
104 let task_store = cx.new_model(|cx| {
105 let mut task_store = TaskStore::local(
106 fs.clone(),
107 buffer_store.downgrade(),
108 worktree_store.clone(),
109 toolchain_store.read(cx).as_language_toolchain_store(),
110 environment.clone(),
111 cx,
112 );
113 task_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
114 task_store
115 });
116 let settings_observer = cx.new_model(|cx| {
117 let mut observer = SettingsObserver::new_local(
118 fs.clone(),
119 worktree_store.clone(),
120 task_store.clone(),
121 cx,
122 );
123 observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
124 observer
125 });
126
127 let lsp_store = cx.new_model(|cx| {
128 let mut lsp_store = LspStore::new_local(
129 buffer_store.clone(),
130 worktree_store.clone(),
131 prettier_store.clone(),
132 toolchain_store.clone(),
133 environment,
134 languages.clone(),
135 http_client.clone(),
136 fs.clone(),
137 cx,
138 );
139 lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
140 lsp_store
141 });
142
143 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
144
145 cx.subscribe(
146 &buffer_store,
147 |_this, _buffer_store, event, cx| match event {
148 BufferStoreEvent::BufferAdded(buffer) => {
149 cx.subscribe(buffer, Self::on_buffer_event).detach();
150 }
151 _ => {}
152 },
153 )
154 .detach();
155
156 let extensions = HeadlessExtensionStore::new(
157 fs.clone(),
158 http_client.clone(),
159 paths::remote_extensions_dir().to_path_buf(),
160 proxy,
161 node_runtime,
162 cx,
163 );
164
165 let client: AnyProtoClient = session.clone().into();
166
167 session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
168 session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
169 session.subscribe_to_entity(SSH_PROJECT_ID, &cx.handle());
170 session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
171 session.subscribe_to_entity(SSH_PROJECT_ID, &task_store);
172 session.subscribe_to_entity(SSH_PROJECT_ID, &toolchain_store);
173 session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
174
175 client.add_request_handler(cx.weak_model(), Self::handle_list_remote_directory);
176 client.add_request_handler(cx.weak_model(), Self::handle_get_path_metadata);
177 client.add_request_handler(cx.weak_model(), Self::handle_shutdown_remote_server);
178 client.add_request_handler(cx.weak_model(), Self::handle_ping);
179
180 client.add_model_request_handler(Self::handle_add_worktree);
181 client.add_request_handler(cx.weak_model(), Self::handle_remove_worktree);
182
183 client.add_model_request_handler(Self::handle_open_buffer_by_path);
184 client.add_model_request_handler(Self::handle_open_new_buffer);
185 client.add_model_request_handler(Self::handle_find_search_candidates);
186 client.add_model_request_handler(Self::handle_open_server_settings);
187
188 client.add_model_request_handler(BufferStore::handle_update_buffer);
189 client.add_model_message_handler(BufferStore::handle_close_buffer);
190
191 client.add_request_handler(
192 extensions.clone().downgrade(),
193 HeadlessExtensionStore::handle_sync_extensions,
194 );
195 client.add_request_handler(
196 extensions.clone().downgrade(),
197 HeadlessExtensionStore::handle_install_extension,
198 );
199
200 BufferStore::init(&client);
201 WorktreeStore::init(&client);
202 SettingsObserver::init(&client);
203 LspStore::init(&client);
204 TaskStore::init(Some(&client));
205 ToolchainStore::init(&client);
206
207 HeadlessProject {
208 session: client,
209 settings_observer,
210 fs,
211 worktree_store,
212 buffer_store,
213 lsp_store,
214 task_store,
215 next_entry_id: Default::default(),
216 languages,
217 extensions,
218 }
219 }
220
221 fn on_buffer_event(
222 &mut self,
223 buffer: Model<Buffer>,
224 event: &BufferEvent,
225 cx: &mut ModelContext<Self>,
226 ) {
227 match event {
228 BufferEvent::Operation {
229 operation,
230 is_local: true,
231 } => cx
232 .background_executor()
233 .spawn(self.session.request(proto::UpdateBuffer {
234 project_id: SSH_PROJECT_ID,
235 buffer_id: buffer.read(cx).remote_id().to_proto(),
236 operations: vec![serialize_operation(operation)],
237 }))
238 .detach(),
239 _ => {}
240 }
241 }
242
243 fn on_lsp_store_event(
244 &mut self,
245 _lsp_store: Model<LspStore>,
246 event: &LspStoreEvent,
247 cx: &mut ModelContext<Self>,
248 ) {
249 match event {
250 LspStoreEvent::LanguageServerUpdate {
251 language_server_id,
252 message,
253 } => {
254 self.session
255 .send(proto::UpdateLanguageServer {
256 project_id: SSH_PROJECT_ID,
257 language_server_id: language_server_id.to_proto(),
258 variant: Some(message.clone()),
259 })
260 .log_err();
261 }
262 LspStoreEvent::Notification(message) => {
263 self.session
264 .send(proto::Toast {
265 project_id: SSH_PROJECT_ID,
266 notification_id: "lsp".to_string(),
267 message: message.clone(),
268 })
269 .log_err();
270 }
271 LspStoreEvent::LanguageServerLog(language_server_id, log_type, message) => {
272 self.session
273 .send(proto::LanguageServerLog {
274 project_id: SSH_PROJECT_ID,
275 language_server_id: language_server_id.to_proto(),
276 message: message.clone(),
277 log_type: Some(log_type.to_proto()),
278 })
279 .log_err();
280 }
281 LspStoreEvent::LanguageServerPrompt(prompt) => {
282 let request = self.session.request(proto::LanguageServerPromptRequest {
283 project_id: SSH_PROJECT_ID,
284 actions: prompt
285 .actions
286 .iter()
287 .map(|action| action.title.to_string())
288 .collect(),
289 level: Some(prompt_to_proto(&prompt)),
290 lsp_name: prompt.lsp_name.clone(),
291 message: prompt.message.clone(),
292 });
293 let prompt = prompt.clone();
294 cx.background_executor()
295 .spawn(async move {
296 let response = request.await?;
297 if let Some(action_response) = response.action_response {
298 prompt.respond(action_response as usize).await;
299 }
300 anyhow::Ok(())
301 })
302 .detach();
303 }
304 _ => {}
305 }
306 }
307
308 pub async fn handle_add_worktree(
309 this: Model<Self>,
310 message: TypedEnvelope<proto::AddWorktree>,
311 mut cx: AsyncAppContext,
312 ) -> Result<proto::AddWorktreeResponse> {
313 use client::ErrorCodeExt;
314 let path = shellexpand::tilde(&message.payload.path).to_string();
315
316 let fs = this.read_with(&mut cx, |this, _| this.fs.clone())?;
317 let path = PathBuf::from(path);
318
319 let canonicalized = match fs.canonicalize(&path).await {
320 Ok(path) => path,
321 Err(e) => {
322 let mut parent = path
323 .parent()
324 .ok_or(e)
325 .map_err(|_| anyhow!("{:?} does not exist", path))?;
326 if parent == Path::new("") {
327 parent = util::paths::home_dir();
328 }
329 let parent = fs.canonicalize(parent).await.map_err(|_| {
330 anyhow!(proto::ErrorCode::DevServerProjectPathDoesNotExist
331 .with_tag("path", &path.to_string_lossy().as_ref()))
332 })?;
333 parent.join(path.file_name().unwrap())
334 }
335 };
336
337 let worktree = this
338 .update(&mut cx.clone(), |this, _| {
339 Worktree::local(
340 Arc::from(canonicalized.as_path()),
341 message.payload.visible,
342 this.fs.clone(),
343 this.next_entry_id.clone(),
344 &mut cx,
345 )
346 })?
347 .await?;
348
349 let response = this.update(&mut cx, |_, cx| {
350 worktree.update(cx, |worktree, _| proto::AddWorktreeResponse {
351 worktree_id: worktree.id().to_proto(),
352 canonicalized_path: canonicalized.to_string_lossy().to_string(),
353 })
354 })?;
355
356 // We spawn this asynchronously, so that we can send the response back
357 // *before* `worktree_store.add()` can send out UpdateProject requests
358 // to the client about the new worktree.
359 //
360 // That lets the client manage the reference/handles of the newly-added
361 // worktree, before getting interrupted by an UpdateProject request.
362 //
363 // This fixes the problem of the client sending the AddWorktree request,
364 // headless project sending out a project update, client receiving it
365 // and immediately dropping the reference of the new client, causing it
366 // to be dropped on the headless project, and the client only then
367 // receiving a response to AddWorktree.
368 cx.spawn(|mut cx| async move {
369 this.update(&mut cx, |this, cx| {
370 this.worktree_store.update(cx, |worktree_store, cx| {
371 worktree_store.add(&worktree, cx);
372 });
373 })
374 .log_err();
375 })
376 .detach();
377
378 Ok(response)
379 }
380
381 pub async fn handle_remove_worktree(
382 this: Model<Self>,
383 envelope: TypedEnvelope<proto::RemoveWorktree>,
384 mut cx: AsyncAppContext,
385 ) -> Result<proto::Ack> {
386 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
387 this.update(&mut cx, |this, cx| {
388 this.worktree_store.update(cx, |worktree_store, cx| {
389 worktree_store.remove_worktree(worktree_id, cx);
390 });
391 })?;
392 Ok(proto::Ack {})
393 }
394
395 pub async fn handle_open_buffer_by_path(
396 this: Model<Self>,
397 message: TypedEnvelope<proto::OpenBufferByPath>,
398 mut cx: AsyncAppContext,
399 ) -> Result<proto::OpenBufferResponse> {
400 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
401 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
402 let buffer_store = this.buffer_store.clone();
403 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
404 buffer_store.open_buffer(
405 ProjectPath {
406 worktree_id,
407 path: PathBuf::from(message.payload.path).into(),
408 },
409 cx,
410 )
411 });
412 anyhow::Ok((buffer_store, buffer))
413 })??;
414
415 let buffer = buffer.await?;
416 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
417 buffer_store.update(&mut cx, |buffer_store, cx| {
418 buffer_store
419 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
420 .detach_and_log_err(cx);
421 })?;
422
423 Ok(proto::OpenBufferResponse {
424 buffer_id: buffer_id.to_proto(),
425 })
426 }
427
428 pub async fn handle_open_new_buffer(
429 this: Model<Self>,
430 _message: TypedEnvelope<proto::OpenNewBuffer>,
431 mut cx: AsyncAppContext,
432 ) -> Result<proto::OpenBufferResponse> {
433 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
434 let buffer_store = this.buffer_store.clone();
435 let buffer = this
436 .buffer_store
437 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx));
438 anyhow::Ok((buffer_store, buffer))
439 })??;
440
441 let buffer = buffer.await?;
442 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
443 buffer_store.update(&mut cx, |buffer_store, cx| {
444 buffer_store
445 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
446 .detach_and_log_err(cx);
447 })?;
448
449 Ok(proto::OpenBufferResponse {
450 buffer_id: buffer_id.to_proto(),
451 })
452 }
453
454 pub async fn handle_open_server_settings(
455 this: Model<Self>,
456 _: TypedEnvelope<proto::OpenServerSettings>,
457 mut cx: AsyncAppContext,
458 ) -> Result<proto::OpenBufferResponse> {
459 let settings_path = paths::settings_file();
460 let (worktree, path) = this
461 .update(&mut cx, |this, cx| {
462 this.worktree_store.update(cx, |worktree_store, cx| {
463 worktree_store.find_or_create_worktree(settings_path, false, cx)
464 })
465 })?
466 .await?;
467
468 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
469 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
470 buffer_store.open_buffer(
471 ProjectPath {
472 worktree_id: worktree.read(cx).id(),
473 path: path.into(),
474 },
475 cx,
476 )
477 });
478
479 (buffer, this.buffer_store.clone())
480 })?;
481
482 let buffer = buffer.await?;
483
484 let buffer_id = cx.update(|cx| {
485 if buffer.read(cx).is_empty() {
486 buffer.update(cx, |buffer, cx| {
487 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
488 });
489 }
490
491 let buffer_id = buffer.read_with(cx, |b, _| b.remote_id());
492
493 buffer_store.update(cx, |buffer_store, cx| {
494 buffer_store
495 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
496 .detach_and_log_err(cx);
497 });
498
499 buffer_id
500 })?;
501
502 Ok(proto::OpenBufferResponse {
503 buffer_id: buffer_id.to_proto(),
504 })
505 }
506
507 pub async fn handle_find_search_candidates(
508 this: Model<Self>,
509 envelope: TypedEnvelope<proto::FindSearchCandidates>,
510 mut cx: AsyncAppContext,
511 ) -> Result<proto::FindSearchCandidatesResponse> {
512 let message = envelope.payload;
513 let query = SearchQuery::from_proto(
514 message
515 .query
516 .ok_or_else(|| anyhow!("missing query field"))?,
517 )?;
518 let results = this.update(&mut cx, |this, cx| {
519 this.buffer_store.update(cx, |buffer_store, cx| {
520 buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
521 })
522 })?;
523
524 let mut response = proto::FindSearchCandidatesResponse {
525 buffer_ids: Vec::new(),
526 };
527
528 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
529
530 while let Ok(buffer) = results.recv().await {
531 let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
532 response.buffer_ids.push(buffer_id.to_proto());
533 buffer_store
534 .update(&mut cx, |buffer_store, cx| {
535 buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
536 })?
537 .await?;
538 }
539
540 Ok(response)
541 }
542
543 pub async fn handle_list_remote_directory(
544 this: Model<Self>,
545 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
546 cx: AsyncAppContext,
547 ) -> Result<proto::ListRemoteDirectoryResponse> {
548 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
549 let fs = cx.read_model(&this, |this, _| this.fs.clone())?;
550
551 let mut entries = Vec::new();
552 let mut response = fs.read_dir(Path::new(&expanded)).await?;
553 while let Some(path) = response.next().await {
554 if let Some(file_name) = path?.file_name() {
555 entries.push(file_name.to_string_lossy().to_string());
556 }
557 }
558 Ok(proto::ListRemoteDirectoryResponse { entries })
559 }
560
561 pub async fn handle_get_path_metadata(
562 this: Model<Self>,
563 envelope: TypedEnvelope<proto::GetPathMetadata>,
564 cx: AsyncAppContext,
565 ) -> Result<proto::GetPathMetadataResponse> {
566 let fs = cx.read_model(&this, |this, _| this.fs.clone())?;
567 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
568
569 let metadata = fs.metadata(&PathBuf::from(expanded.clone())).await?;
570 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
571
572 Ok(proto::GetPathMetadataResponse {
573 exists: metadata.is_some(),
574 is_dir,
575 path: expanded,
576 })
577 }
578
579 pub async fn handle_shutdown_remote_server(
580 _this: Model<Self>,
581 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
582 cx: AsyncAppContext,
583 ) -> Result<proto::Ack> {
584 cx.spawn(|cx| async move {
585 cx.update(|cx| {
586 // TODO: This is a hack, because in a headless project, shutdown isn't executed
587 // when calling quit, but it should be.
588 cx.shutdown();
589 cx.quit();
590 })
591 })
592 .detach();
593
594 Ok(proto::Ack {})
595 }
596
597 pub async fn handle_ping(
598 _this: Model<Self>,
599 _envelope: TypedEnvelope<proto::Ping>,
600 _cx: AsyncAppContext,
601 ) -> Result<proto::Ack> {
602 log::debug!("Received ping from client");
603 Ok(proto::Ack {})
604 }
605}
606
607fn prompt_to_proto(
608 prompt: &project::LanguageServerPromptRequest,
609) -> proto::language_server_prompt_request::Level {
610 match prompt.level {
611 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
612 proto::language_server_prompt_request::Info {},
613 ),
614 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
615 proto::language_server_prompt_request::Warning {},
616 ),
617 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
618 proto::language_server_prompt_request::Critical {},
619 ),
620 }
621}