1use anyhow::{anyhow, Result};
2use fs::Fs;
3use gpui::{AppContext, AsyncAppContext, Context as _, Model, ModelContext, PromptLevel};
4use http_client::HttpClient;
5use language::{proto::serialize_operation, Buffer, BufferEvent, LanguageRegistry};
6use node_runtime::NodeRuntime;
7use project::{
8 buffer_store::{BufferStore, BufferStoreEvent},
9 project_settings::SettingsObserver,
10 search::SearchQuery,
11 task_store::TaskStore,
12 worktree_store::WorktreeStore,
13 LspStore, LspStoreEvent, PrettierStore, ProjectPath, ToolchainStore, WorktreeId,
14};
15use remote::ssh_session::ChannelClient;
16use rpc::{
17 proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
18 AnyProtoClient, TypedEnvelope,
19};
20
21use settings::initial_server_settings_content;
22use smol::stream::StreamExt;
23use std::{
24 path::{Path, PathBuf},
25 sync::{atomic::AtomicUsize, Arc},
26};
27use util::ResultExt;
28use worktree::Worktree;
29
30pub struct HeadlessProject {
31 pub fs: Arc<dyn Fs>,
32 pub session: AnyProtoClient,
33 pub worktree_store: Model<WorktreeStore>,
34 pub buffer_store: Model<BufferStore>,
35 pub lsp_store: Model<LspStore>,
36 pub task_store: Model<TaskStore>,
37 pub settings_observer: Model<SettingsObserver>,
38 pub next_entry_id: Arc<AtomicUsize>,
39 pub languages: Arc<LanguageRegistry>,
40}
41
42pub struct HeadlessAppState {
43 pub session: Arc<ChannelClient>,
44 pub fs: Arc<dyn Fs>,
45 pub http_client: Arc<dyn HttpClient>,
46 pub node_runtime: NodeRuntime,
47 pub languages: Arc<LanguageRegistry>,
48}
49
50impl HeadlessProject {
51 pub fn init(cx: &mut AppContext) {
52 settings::init(cx);
53 language::init(cx);
54 project::Project::init_settings(cx);
55 }
56
57 pub fn new(
58 HeadlessAppState {
59 session,
60 fs,
61 http_client,
62 node_runtime,
63 languages,
64 }: HeadlessAppState,
65 cx: &mut ModelContext<Self>,
66 ) -> Self {
67 languages::init(languages.clone(), node_runtime.clone(), cx);
68
69 let worktree_store = cx.new_model(|cx| {
70 let mut store = WorktreeStore::local(true, fs.clone());
71 store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
72 store
73 });
74 let buffer_store = cx.new_model(|cx| {
75 let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
76 buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
77 buffer_store
78 });
79 let prettier_store = cx.new_model(|cx| {
80 PrettierStore::new(
81 node_runtime.clone(),
82 fs.clone(),
83 languages.clone(),
84 worktree_store.clone(),
85 cx,
86 )
87 });
88 let environment = project::ProjectEnvironment::new(&worktree_store, None, cx);
89 let toolchain_store = cx.new_model(|cx| {
90 ToolchainStore::local(
91 languages.clone(),
92 worktree_store.clone(),
93 environment.clone(),
94 cx,
95 )
96 });
97
98 let task_store = cx.new_model(|cx| {
99 let mut task_store = TaskStore::local(
100 fs.clone(),
101 buffer_store.downgrade(),
102 worktree_store.clone(),
103 toolchain_store.read(cx).as_language_toolchain_store(),
104 environment.clone(),
105 cx,
106 );
107 task_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
108 task_store
109 });
110 let settings_observer = cx.new_model(|cx| {
111 let mut observer = SettingsObserver::new_local(
112 fs.clone(),
113 worktree_store.clone(),
114 task_store.clone(),
115 cx,
116 );
117 observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
118 observer
119 });
120
121 let lsp_store = cx.new_model(|cx| {
122 let mut lsp_store = LspStore::new_local(
123 buffer_store.clone(),
124 worktree_store.clone(),
125 prettier_store.clone(),
126 toolchain_store.clone(),
127 environment,
128 languages.clone(),
129 http_client.clone(),
130 fs.clone(),
131 cx,
132 );
133 lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
134 lsp_store
135 });
136
137 cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
138
139 cx.subscribe(
140 &buffer_store,
141 |_this, _buffer_store, event, cx| match event {
142 BufferStoreEvent::BufferAdded(buffer) => {
143 cx.subscribe(buffer, Self::on_buffer_event).detach();
144 }
145 _ => {}
146 },
147 )
148 .detach();
149
150 let client: AnyProtoClient = session.clone().into();
151
152 session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
153 session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
154 session.subscribe_to_entity(SSH_PROJECT_ID, &cx.handle());
155 session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
156 session.subscribe_to_entity(SSH_PROJECT_ID, &task_store);
157 session.subscribe_to_entity(SSH_PROJECT_ID, &toolchain_store);
158 session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
159
160 client.add_request_handler(cx.weak_model(), Self::handle_list_remote_directory);
161 client.add_request_handler(cx.weak_model(), Self::handle_get_path_metadata);
162 client.add_request_handler(cx.weak_model(), Self::handle_shutdown_remote_server);
163 client.add_request_handler(cx.weak_model(), Self::handle_ping);
164
165 client.add_model_request_handler(Self::handle_add_worktree);
166 client.add_request_handler(cx.weak_model(), Self::handle_remove_worktree);
167
168 client.add_model_request_handler(Self::handle_open_buffer_by_path);
169 client.add_model_request_handler(Self::handle_open_new_buffer);
170 client.add_model_request_handler(Self::handle_find_search_candidates);
171 client.add_model_request_handler(Self::handle_open_server_settings);
172
173 client.add_model_request_handler(BufferStore::handle_update_buffer);
174 client.add_model_message_handler(BufferStore::handle_close_buffer);
175
176 BufferStore::init(&client);
177 WorktreeStore::init(&client);
178 SettingsObserver::init(&client);
179 LspStore::init(&client);
180 TaskStore::init(Some(&client));
181 ToolchainStore::init(&client);
182
183 HeadlessProject {
184 session: client,
185 settings_observer,
186 fs,
187 worktree_store,
188 buffer_store,
189 lsp_store,
190 task_store,
191 next_entry_id: Default::default(),
192 languages,
193 }
194 }
195
196 fn on_buffer_event(
197 &mut self,
198 buffer: Model<Buffer>,
199 event: &BufferEvent,
200 cx: &mut ModelContext<Self>,
201 ) {
202 match event {
203 BufferEvent::Operation {
204 operation,
205 is_local: true,
206 } => cx
207 .background_executor()
208 .spawn(self.session.request(proto::UpdateBuffer {
209 project_id: SSH_PROJECT_ID,
210 buffer_id: buffer.read(cx).remote_id().to_proto(),
211 operations: vec![serialize_operation(operation)],
212 }))
213 .detach(),
214 _ => {}
215 }
216 }
217
218 fn on_lsp_store_event(
219 &mut self,
220 _lsp_store: Model<LspStore>,
221 event: &LspStoreEvent,
222 cx: &mut ModelContext<Self>,
223 ) {
224 match event {
225 LspStoreEvent::LanguageServerUpdate {
226 language_server_id,
227 message,
228 } => {
229 self.session
230 .send(proto::UpdateLanguageServer {
231 project_id: SSH_PROJECT_ID,
232 language_server_id: language_server_id.to_proto(),
233 variant: Some(message.clone()),
234 })
235 .log_err();
236 }
237 LspStoreEvent::Notification(message) => {
238 self.session
239 .send(proto::Toast {
240 project_id: SSH_PROJECT_ID,
241 notification_id: "lsp".to_string(),
242 message: message.clone(),
243 })
244 .log_err();
245 }
246 LspStoreEvent::LanguageServerLog(language_server_id, log_type, message) => {
247 self.session
248 .send(proto::LanguageServerLog {
249 project_id: SSH_PROJECT_ID,
250 language_server_id: language_server_id.to_proto(),
251 message: message.clone(),
252 log_type: Some(log_type.to_proto()),
253 })
254 .log_err();
255 }
256 LspStoreEvent::LanguageServerPrompt(prompt) => {
257 let request = self.session.request(proto::LanguageServerPromptRequest {
258 project_id: SSH_PROJECT_ID,
259 actions: prompt
260 .actions
261 .iter()
262 .map(|action| action.title.to_string())
263 .collect(),
264 level: Some(prompt_to_proto(&prompt)),
265 lsp_name: prompt.lsp_name.clone(),
266 message: prompt.message.clone(),
267 });
268 let prompt = prompt.clone();
269 cx.background_executor()
270 .spawn(async move {
271 let response = request.await?;
272 if let Some(action_response) = response.action_response {
273 prompt.respond(action_response as usize).await;
274 }
275 anyhow::Ok(())
276 })
277 .detach();
278 }
279 _ => {}
280 }
281 }
282
283 pub async fn handle_add_worktree(
284 this: Model<Self>,
285 message: TypedEnvelope<proto::AddWorktree>,
286 mut cx: AsyncAppContext,
287 ) -> Result<proto::AddWorktreeResponse> {
288 use client::ErrorCodeExt;
289 let path = shellexpand::tilde(&message.payload.path).to_string();
290
291 let fs = this.read_with(&mut cx, |this, _| this.fs.clone())?;
292 let path = PathBuf::from(path);
293
294 let canonicalized = match fs.canonicalize(&path).await {
295 Ok(path) => path,
296 Err(e) => {
297 let mut parent = path
298 .parent()
299 .ok_or(e)
300 .map_err(|_| anyhow!("{:?} does not exist", path))?;
301 if parent == Path::new("") {
302 parent = util::paths::home_dir();
303 }
304 let parent = fs.canonicalize(parent).await.map_err(|_| {
305 anyhow!(proto::ErrorCode::DevServerProjectPathDoesNotExist
306 .with_tag("path", &path.to_string_lossy().as_ref()))
307 })?;
308 parent.join(path.file_name().unwrap())
309 }
310 };
311
312 let worktree = this
313 .update(&mut cx.clone(), |this, _| {
314 Worktree::local(
315 Arc::from(canonicalized.as_path()),
316 message.payload.visible,
317 this.fs.clone(),
318 this.next_entry_id.clone(),
319 &mut cx,
320 )
321 })?
322 .await?;
323
324 let response = this.update(&mut cx, |_, cx| {
325 worktree.update(cx, |worktree, _| proto::AddWorktreeResponse {
326 worktree_id: worktree.id().to_proto(),
327 canonicalized_path: canonicalized.to_string_lossy().to_string(),
328 })
329 })?;
330
331 // We spawn this asynchronously, so that we can send the response back
332 // *before* `worktree_store.add()` can send out UpdateProject requests
333 // to the client about the new worktree.
334 //
335 // That lets the client manage the reference/handles of the newly-added
336 // worktree, before getting interrupted by an UpdateProject request.
337 //
338 // This fixes the problem of the client sending the AddWorktree request,
339 // headless project sending out a project update, client receiving it
340 // and immediately dropping the reference of the new client, causing it
341 // to be dropped on the headless project, and the client only then
342 // receiving a response to AddWorktree.
343 cx.spawn(|mut cx| async move {
344 this.update(&mut cx, |this, cx| {
345 this.worktree_store.update(cx, |worktree_store, cx| {
346 worktree_store.add(&worktree, cx);
347 });
348 })
349 .log_err();
350 })
351 .detach();
352
353 Ok(response)
354 }
355
356 pub async fn handle_remove_worktree(
357 this: Model<Self>,
358 envelope: TypedEnvelope<proto::RemoveWorktree>,
359 mut cx: AsyncAppContext,
360 ) -> Result<proto::Ack> {
361 let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
362 this.update(&mut cx, |this, cx| {
363 this.worktree_store.update(cx, |worktree_store, cx| {
364 worktree_store.remove_worktree(worktree_id, cx);
365 });
366 })?;
367 Ok(proto::Ack {})
368 }
369
370 pub async fn handle_open_buffer_by_path(
371 this: Model<Self>,
372 message: TypedEnvelope<proto::OpenBufferByPath>,
373 mut cx: AsyncAppContext,
374 ) -> Result<proto::OpenBufferResponse> {
375 let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
376 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
377 let buffer_store = this.buffer_store.clone();
378 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
379 buffer_store.open_buffer(
380 ProjectPath {
381 worktree_id,
382 path: PathBuf::from(message.payload.path).into(),
383 },
384 cx,
385 )
386 });
387 anyhow::Ok((buffer_store, buffer))
388 })??;
389
390 let buffer = buffer.await?;
391 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
392 buffer_store.update(&mut cx, |buffer_store, cx| {
393 buffer_store
394 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
395 .detach_and_log_err(cx);
396 })?;
397
398 Ok(proto::OpenBufferResponse {
399 buffer_id: buffer_id.to_proto(),
400 })
401 }
402
403 pub async fn handle_open_new_buffer(
404 this: Model<Self>,
405 _message: TypedEnvelope<proto::OpenNewBuffer>,
406 mut cx: AsyncAppContext,
407 ) -> Result<proto::OpenBufferResponse> {
408 let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
409 let buffer_store = this.buffer_store.clone();
410 let buffer = this
411 .buffer_store
412 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx));
413 anyhow::Ok((buffer_store, buffer))
414 })??;
415
416 let buffer = buffer.await?;
417 let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
418 buffer_store.update(&mut cx, |buffer_store, cx| {
419 buffer_store
420 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
421 .detach_and_log_err(cx);
422 })?;
423
424 Ok(proto::OpenBufferResponse {
425 buffer_id: buffer_id.to_proto(),
426 })
427 }
428
429 pub async fn handle_open_server_settings(
430 this: Model<Self>,
431 _: TypedEnvelope<proto::OpenServerSettings>,
432 mut cx: AsyncAppContext,
433 ) -> Result<proto::OpenBufferResponse> {
434 let settings_path = paths::settings_file();
435 let (worktree, path) = this
436 .update(&mut cx, |this, cx| {
437 this.worktree_store.update(cx, |worktree_store, cx| {
438 worktree_store.find_or_create_worktree(settings_path, false, cx)
439 })
440 })?
441 .await?;
442
443 let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
444 let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
445 buffer_store.open_buffer(
446 ProjectPath {
447 worktree_id: worktree.read(cx).id(),
448 path: path.into(),
449 },
450 cx,
451 )
452 });
453
454 (buffer, this.buffer_store.clone())
455 })?;
456
457 let buffer = buffer.await?;
458
459 let buffer_id = cx.update(|cx| {
460 if buffer.read(cx).is_empty() {
461 buffer.update(cx, |buffer, cx| {
462 buffer.edit([(0..0, initial_server_settings_content())], None, cx)
463 });
464 }
465
466 let buffer_id = buffer.read_with(cx, |b, _| b.remote_id());
467
468 buffer_store.update(cx, |buffer_store, cx| {
469 buffer_store
470 .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
471 .detach_and_log_err(cx);
472 });
473
474 buffer_id
475 })?;
476
477 Ok(proto::OpenBufferResponse {
478 buffer_id: buffer_id.to_proto(),
479 })
480 }
481
482 pub async fn handle_find_search_candidates(
483 this: Model<Self>,
484 envelope: TypedEnvelope<proto::FindSearchCandidates>,
485 mut cx: AsyncAppContext,
486 ) -> Result<proto::FindSearchCandidatesResponse> {
487 let message = envelope.payload;
488 let query = SearchQuery::from_proto(
489 message
490 .query
491 .ok_or_else(|| anyhow!("missing query field"))?,
492 )?;
493 let mut results = this.update(&mut cx, |this, cx| {
494 this.buffer_store.update(cx, |buffer_store, cx| {
495 buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
496 })
497 })?;
498
499 let mut response = proto::FindSearchCandidatesResponse {
500 buffer_ids: Vec::new(),
501 };
502
503 let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
504
505 while let Some(buffer) = results.next().await {
506 let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
507 response.buffer_ids.push(buffer_id.to_proto());
508 buffer_store
509 .update(&mut cx, |buffer_store, cx| {
510 buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
511 })?
512 .await?;
513 }
514
515 Ok(response)
516 }
517
518 pub async fn handle_list_remote_directory(
519 this: Model<Self>,
520 envelope: TypedEnvelope<proto::ListRemoteDirectory>,
521 cx: AsyncAppContext,
522 ) -> Result<proto::ListRemoteDirectoryResponse> {
523 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
524 let fs = cx.read_model(&this, |this, _| this.fs.clone())?;
525
526 let mut entries = Vec::new();
527 let mut response = fs.read_dir(Path::new(&expanded)).await?;
528 while let Some(path) = response.next().await {
529 if let Some(file_name) = path?.file_name() {
530 entries.push(file_name.to_string_lossy().to_string());
531 }
532 }
533 Ok(proto::ListRemoteDirectoryResponse { entries })
534 }
535
536 pub async fn handle_get_path_metadata(
537 this: Model<Self>,
538 envelope: TypedEnvelope<proto::GetPathMetadata>,
539 cx: AsyncAppContext,
540 ) -> Result<proto::GetPathMetadataResponse> {
541 let fs = cx.read_model(&this, |this, _| this.fs.clone())?;
542 let expanded = shellexpand::tilde(&envelope.payload.path).to_string();
543
544 let metadata = fs.metadata(&PathBuf::from(expanded.clone())).await?;
545 let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
546
547 Ok(proto::GetPathMetadataResponse {
548 exists: metadata.is_some(),
549 is_dir,
550 path: expanded,
551 })
552 }
553
554 pub async fn handle_shutdown_remote_server(
555 _this: Model<Self>,
556 _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
557 cx: AsyncAppContext,
558 ) -> Result<proto::Ack> {
559 cx.spawn(|cx| async move {
560 cx.update(|cx| {
561 // TODO: This is a hack, because in a headless project, shutdown isn't executed
562 // when calling quit, but it should be.
563 cx.shutdown();
564 cx.quit();
565 })
566 })
567 .detach();
568
569 Ok(proto::Ack {})
570 }
571
572 pub async fn handle_ping(
573 _this: Model<Self>,
574 _envelope: TypedEnvelope<proto::Ping>,
575 _cx: AsyncAppContext,
576 ) -> Result<proto::Ack> {
577 log::debug!("Received ping from client");
578 Ok(proto::Ack {})
579 }
580}
581
582fn prompt_to_proto(
583 prompt: &project::LanguageServerPromptRequest,
584) -> proto::language_server_prompt_request::Level {
585 match prompt.level {
586 PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
587 proto::language_server_prompt_request::Info {},
588 ),
589 PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
590 proto::language_server_prompt_request::Warning {},
591 ),
592 PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
593 proto::language_server_prompt_request::Critical {},
594 ),
595 }
596}