headless_project.rs

  1use ::proto::{FromProto, ToProto};
  2use anyhow::{anyhow, Result};
  3use extension::ExtensionHostProxy;
  4use extension_host::headless_host::HeadlessExtensionStore;
  5use fs::Fs;
  6use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  7use http_client::HttpClient;
  8use language::{proto::serialize_operation, Buffer, BufferEvent, LanguageRegistry};
  9use node_runtime::NodeRuntime;
 10use project::{
 11    buffer_store::{BufferStore, BufferStoreEvent},
 12    git::GitStore,
 13    project_settings::SettingsObserver,
 14    search::SearchQuery,
 15    task_store::TaskStore,
 16    worktree_store::WorktreeStore,
 17    LspStore, LspStoreEvent, PrettierStore, ProjectPath, ToolchainStore, WorktreeId,
 18};
 19use remote::ssh_session::ChannelClient;
 20use rpc::{
 21    proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
 22    AnyProtoClient, TypedEnvelope,
 23};
 24
 25use settings::initial_server_settings_content;
 26use smol::stream::StreamExt;
 27use std::{
 28    path::{Path, PathBuf},
 29    sync::{atomic::AtomicUsize, Arc},
 30};
 31use util::ResultExt;
 32use worktree::Worktree;
 33
 34pub struct HeadlessProject {
 35    pub fs: Arc<dyn Fs>,
 36    pub session: AnyProtoClient,
 37    pub worktree_store: Entity<WorktreeStore>,
 38    pub buffer_store: Entity<BufferStore>,
 39    pub lsp_store: Entity<LspStore>,
 40    pub task_store: Entity<TaskStore>,
 41    pub settings_observer: Entity<SettingsObserver>,
 42    pub next_entry_id: Arc<AtomicUsize>,
 43    pub languages: Arc<LanguageRegistry>,
 44    pub extensions: Entity<HeadlessExtensionStore>,
 45    pub git_store: Entity<GitStore>,
 46}
 47
 48pub struct HeadlessAppState {
 49    pub session: Arc<ChannelClient>,
 50    pub fs: Arc<dyn Fs>,
 51    pub http_client: Arc<dyn HttpClient>,
 52    pub node_runtime: NodeRuntime,
 53    pub languages: Arc<LanguageRegistry>,
 54    pub extension_host_proxy: Arc<ExtensionHostProxy>,
 55}
 56
 57impl HeadlessProject {
 58    pub fn init(cx: &mut App) {
 59        settings::init(cx);
 60        language::init(cx);
 61        project::Project::init_settings(cx);
 62    }
 63
 64    pub fn new(
 65        HeadlessAppState {
 66            session,
 67            fs,
 68            http_client,
 69            node_runtime,
 70            languages,
 71            extension_host_proxy: proxy,
 72        }: HeadlessAppState,
 73        cx: &mut Context<Self>,
 74    ) -> Self {
 75        language_extension::init(proxy.clone(), languages.clone());
 76        languages::init(languages.clone(), node_runtime.clone(), cx);
 77
 78        let worktree_store = cx.new(|cx| {
 79            let mut store = WorktreeStore::local(true, fs.clone());
 80            store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 81            store
 82        });
 83
 84        let buffer_store = cx.new(|cx| {
 85            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
 86            buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 87            buffer_store
 88        });
 89
 90        let git_store =
 91            cx.new(|cx| GitStore::new(&worktree_store, buffer_store.clone(), None, None, cx));
 92        let prettier_store = cx.new(|cx| {
 93            PrettierStore::new(
 94                node_runtime.clone(),
 95                fs.clone(),
 96                languages.clone(),
 97                worktree_store.clone(),
 98                cx,
 99            )
100        });
101        let environment = project::ProjectEnvironment::new(&worktree_store, None, cx);
102        let toolchain_store = cx.new(|cx| {
103            ToolchainStore::local(
104                languages.clone(),
105                worktree_store.clone(),
106                environment.clone(),
107                cx,
108            )
109        });
110
111        let task_store = cx.new(|cx| {
112            let mut task_store = TaskStore::local(
113                fs.clone(),
114                buffer_store.downgrade(),
115                worktree_store.clone(),
116                toolchain_store.read(cx).as_language_toolchain_store(),
117                environment.clone(),
118                cx,
119            );
120            task_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
121            task_store
122        });
123        let settings_observer = cx.new(|cx| {
124            let mut observer = SettingsObserver::new_local(
125                fs.clone(),
126                worktree_store.clone(),
127                task_store.clone(),
128                cx,
129            );
130            observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
131            observer
132        });
133
134        let lsp_store = cx.new(|cx| {
135            let mut lsp_store = LspStore::new_local(
136                buffer_store.clone(),
137                worktree_store.clone(),
138                prettier_store.clone(),
139                toolchain_store.clone(),
140                environment,
141                languages.clone(),
142                http_client.clone(),
143                fs.clone(),
144                cx,
145            );
146            lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
147            lsp_store
148        });
149
150        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
151
152        cx.subscribe(
153            &buffer_store,
154            |_this, _buffer_store, event, cx| match event {
155                BufferStoreEvent::BufferAdded(buffer) => {
156                    cx.subscribe(buffer, Self::on_buffer_event).detach();
157                }
158                _ => {}
159            },
160        )
161        .detach();
162
163        let extensions = HeadlessExtensionStore::new(
164            fs.clone(),
165            http_client.clone(),
166            paths::remote_extensions_dir().to_path_buf(),
167            proxy,
168            node_runtime,
169            cx,
170        );
171
172        let client: AnyProtoClient = session.clone().into();
173
174        // local_machine -> ssh handlers
175        session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
176        session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
177        session.subscribe_to_entity(SSH_PROJECT_ID, &cx.entity());
178        session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
179        session.subscribe_to_entity(SSH_PROJECT_ID, &task_store);
180        session.subscribe_to_entity(SSH_PROJECT_ID, &toolchain_store);
181        session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
182        session.subscribe_to_entity(SSH_PROJECT_ID, &git_store);
183
184        client.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
185        client.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
186        client.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
187        client.add_request_handler(cx.weak_entity(), Self::handle_ping);
188
189        client.add_entity_request_handler(Self::handle_add_worktree);
190        client.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
191
192        client.add_entity_request_handler(Self::handle_open_buffer_by_path);
193        client.add_entity_request_handler(Self::handle_open_new_buffer);
194        client.add_entity_request_handler(Self::handle_find_search_candidates);
195        client.add_entity_request_handler(Self::handle_open_server_settings);
196
197        client.add_entity_request_handler(BufferStore::handle_update_buffer);
198        client.add_entity_message_handler(BufferStore::handle_close_buffer);
199
200        client.add_request_handler(
201            extensions.clone().downgrade(),
202            HeadlessExtensionStore::handle_sync_extensions,
203        );
204        client.add_request_handler(
205            extensions.clone().downgrade(),
206            HeadlessExtensionStore::handle_install_extension,
207        );
208
209        BufferStore::init(&client);
210        WorktreeStore::init(&client);
211        SettingsObserver::init(&client);
212        LspStore::init(&client);
213        TaskStore::init(Some(&client));
214        ToolchainStore::init(&client);
215        GitStore::init(&client);
216
217        HeadlessProject {
218            session: client,
219            settings_observer,
220            fs,
221            worktree_store,
222            buffer_store,
223            lsp_store,
224            task_store,
225            next_entry_id: Default::default(),
226            languages,
227            extensions,
228            git_store,
229        }
230    }
231
232    fn on_buffer_event(
233        &mut self,
234        buffer: Entity<Buffer>,
235        event: &BufferEvent,
236        cx: &mut Context<Self>,
237    ) {
238        match event {
239            BufferEvent::Operation {
240                operation,
241                is_local: true,
242            } => cx
243                .background_spawn(self.session.request(proto::UpdateBuffer {
244                    project_id: SSH_PROJECT_ID,
245                    buffer_id: buffer.read(cx).remote_id().to_proto(),
246                    operations: vec![serialize_operation(operation)],
247                }))
248                .detach(),
249            _ => {}
250        }
251    }
252
253    fn on_lsp_store_event(
254        &mut self,
255        _lsp_store: Entity<LspStore>,
256        event: &LspStoreEvent,
257        cx: &mut Context<Self>,
258    ) {
259        match event {
260            LspStoreEvent::LanguageServerUpdate {
261                language_server_id,
262                message,
263            } => {
264                self.session
265                    .send(proto::UpdateLanguageServer {
266                        project_id: SSH_PROJECT_ID,
267                        language_server_id: language_server_id.to_proto(),
268                        variant: Some(message.clone()),
269                    })
270                    .log_err();
271            }
272            LspStoreEvent::Notification(message) => {
273                self.session
274                    .send(proto::Toast {
275                        project_id: SSH_PROJECT_ID,
276                        notification_id: "lsp".to_string(),
277                        message: message.clone(),
278                    })
279                    .log_err();
280            }
281            LspStoreEvent::LanguageServerLog(language_server_id, log_type, message) => {
282                self.session
283                    .send(proto::LanguageServerLog {
284                        project_id: SSH_PROJECT_ID,
285                        language_server_id: language_server_id.to_proto(),
286                        message: message.clone(),
287                        log_type: Some(log_type.to_proto()),
288                    })
289                    .log_err();
290            }
291            LspStoreEvent::LanguageServerPrompt(prompt) => {
292                let request = self.session.request(proto::LanguageServerPromptRequest {
293                    project_id: SSH_PROJECT_ID,
294                    actions: prompt
295                        .actions
296                        .iter()
297                        .map(|action| action.title.to_string())
298                        .collect(),
299                    level: Some(prompt_to_proto(&prompt)),
300                    lsp_name: prompt.lsp_name.clone(),
301                    message: prompt.message.clone(),
302                });
303                let prompt = prompt.clone();
304                cx.background_spawn(async move {
305                    let response = request.await?;
306                    if let Some(action_response) = response.action_response {
307                        prompt.respond(action_response as usize).await;
308                    }
309                    anyhow::Ok(())
310                })
311                .detach();
312            }
313            _ => {}
314        }
315    }
316
317    pub async fn handle_add_worktree(
318        this: Entity<Self>,
319        message: TypedEnvelope<proto::AddWorktree>,
320        mut cx: AsyncApp,
321    ) -> Result<proto::AddWorktreeResponse> {
322        use client::ErrorCodeExt;
323        let fs = this.read_with(&mut cx, |this, _| this.fs.clone())?;
324        let path = PathBuf::from_proto(shellexpand::tilde(&message.payload.path).to_string());
325
326        let canonicalized = match fs.canonicalize(&path).await {
327            Ok(path) => path,
328            Err(e) => {
329                let mut parent = path
330                    .parent()
331                    .ok_or(e)
332                    .map_err(|_| anyhow!("{:?} does not exist", path))?;
333                if parent == Path::new("") {
334                    parent = util::paths::home_dir();
335                }
336                let parent = fs.canonicalize(parent).await.map_err(|_| {
337                    anyhow!(proto::ErrorCode::DevServerProjectPathDoesNotExist
338                        .with_tag("path", &path.to_string_lossy().as_ref()))
339                })?;
340                parent.join(path.file_name().unwrap())
341            }
342        };
343
344        let worktree = this
345            .update(&mut cx.clone(), |this, _| {
346                Worktree::local(
347                    Arc::from(canonicalized.as_path()),
348                    message.payload.visible,
349                    this.fs.clone(),
350                    this.next_entry_id.clone(),
351                    &mut cx,
352                )
353            })?
354            .await?;
355
356        let response = this.update(&mut cx, |_, cx| {
357            worktree.update(cx, |worktree, _| proto::AddWorktreeResponse {
358                worktree_id: worktree.id().to_proto(),
359                canonicalized_path: canonicalized.to_proto(),
360            })
361        })?;
362
363        // We spawn this asynchronously, so that we can send the response back
364        // *before* `worktree_store.add()` can send out UpdateProject requests
365        // to the client about the new worktree.
366        //
367        // That lets the client manage the reference/handles of the newly-added
368        // worktree, before getting interrupted by an UpdateProject request.
369        //
370        // This fixes the problem of the client sending the AddWorktree request,
371        // headless project sending out a project update, client receiving it
372        // and immediately dropping the reference of the new client, causing it
373        // to be dropped on the headless project, and the client only then
374        // receiving a response to AddWorktree.
375        cx.spawn(|mut cx| async move {
376            this.update(&mut cx, |this, cx| {
377                this.worktree_store.update(cx, |worktree_store, cx| {
378                    worktree_store.add(&worktree, cx);
379                });
380            })
381            .log_err();
382        })
383        .detach();
384
385        Ok(response)
386    }
387
388    pub async fn handle_remove_worktree(
389        this: Entity<Self>,
390        envelope: TypedEnvelope<proto::RemoveWorktree>,
391        mut cx: AsyncApp,
392    ) -> Result<proto::Ack> {
393        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
394        this.update(&mut cx, |this, cx| {
395            this.worktree_store.update(cx, |worktree_store, cx| {
396                worktree_store.remove_worktree(worktree_id, cx);
397            });
398        })?;
399        Ok(proto::Ack {})
400    }
401
402    pub async fn handle_open_buffer_by_path(
403        this: Entity<Self>,
404        message: TypedEnvelope<proto::OpenBufferByPath>,
405        mut cx: AsyncApp,
406    ) -> Result<proto::OpenBufferResponse> {
407        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
408        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
409            let buffer_store = this.buffer_store.clone();
410            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
411                buffer_store.open_buffer(
412                    ProjectPath {
413                        worktree_id,
414                        path: Arc::<Path>::from_proto(message.payload.path),
415                    },
416                    cx,
417                )
418            });
419            anyhow::Ok((buffer_store, buffer))
420        })??;
421
422        let buffer = buffer.await?;
423        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
424        buffer_store.update(&mut cx, |buffer_store, cx| {
425            buffer_store
426                .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
427                .detach_and_log_err(cx);
428        })?;
429
430        Ok(proto::OpenBufferResponse {
431            buffer_id: buffer_id.to_proto(),
432        })
433    }
434
435    pub async fn handle_open_new_buffer(
436        this: Entity<Self>,
437        _message: TypedEnvelope<proto::OpenNewBuffer>,
438        mut cx: AsyncApp,
439    ) -> Result<proto::OpenBufferResponse> {
440        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
441            let buffer_store = this.buffer_store.clone();
442            let buffer = this
443                .buffer_store
444                .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx));
445            anyhow::Ok((buffer_store, buffer))
446        })??;
447
448        let buffer = buffer.await?;
449        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
450        buffer_store.update(&mut cx, |buffer_store, cx| {
451            buffer_store
452                .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
453                .detach_and_log_err(cx);
454        })?;
455
456        Ok(proto::OpenBufferResponse {
457            buffer_id: buffer_id.to_proto(),
458        })
459    }
460
461    pub async fn handle_open_server_settings(
462        this: Entity<Self>,
463        _: TypedEnvelope<proto::OpenServerSettings>,
464        mut cx: AsyncApp,
465    ) -> Result<proto::OpenBufferResponse> {
466        let settings_path = paths::settings_file();
467        let (worktree, path) = this
468            .update(&mut cx, |this, cx| {
469                this.worktree_store.update(cx, |worktree_store, cx| {
470                    worktree_store.find_or_create_worktree(settings_path, false, cx)
471                })
472            })?
473            .await?;
474
475        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
476            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
477                buffer_store.open_buffer(
478                    ProjectPath {
479                        worktree_id: worktree.read(cx).id(),
480                        path: path.into(),
481                    },
482                    cx,
483                )
484            });
485
486            (buffer, this.buffer_store.clone())
487        })?;
488
489        let buffer = buffer.await?;
490
491        let buffer_id = cx.update(|cx| {
492            if buffer.read(cx).is_empty() {
493                buffer.update(cx, |buffer, cx| {
494                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
495                });
496            }
497
498            let buffer_id = buffer.read_with(cx, |b, _| b.remote_id());
499
500            buffer_store.update(cx, |buffer_store, cx| {
501                buffer_store
502                    .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
503                    .detach_and_log_err(cx);
504            });
505
506            buffer_id
507        })?;
508
509        Ok(proto::OpenBufferResponse {
510            buffer_id: buffer_id.to_proto(),
511        })
512    }
513
514    pub async fn handle_find_search_candidates(
515        this: Entity<Self>,
516        envelope: TypedEnvelope<proto::FindSearchCandidates>,
517        mut cx: AsyncApp,
518    ) -> Result<proto::FindSearchCandidatesResponse> {
519        let message = envelope.payload;
520        let query = SearchQuery::from_proto(
521            message
522                .query
523                .ok_or_else(|| anyhow!("missing query field"))?,
524        )?;
525        let results = this.update(&mut cx, |this, cx| {
526            this.buffer_store.update(cx, |buffer_store, cx| {
527                buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
528            })
529        })?;
530
531        let mut response = proto::FindSearchCandidatesResponse {
532            buffer_ids: Vec::new(),
533        };
534
535        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
536
537        while let Ok(buffer) = results.recv().await {
538            let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
539            response.buffer_ids.push(buffer_id.to_proto());
540            buffer_store
541                .update(&mut cx, |buffer_store, cx| {
542                    buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
543                })?
544                .await?;
545        }
546
547        Ok(response)
548    }
549
550    pub async fn handle_list_remote_directory(
551        this: Entity<Self>,
552        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
553        cx: AsyncApp,
554    ) -> Result<proto::ListRemoteDirectoryResponse> {
555        let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
556        let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
557        let check_info = envelope
558            .payload
559            .config
560            .as_ref()
561            .is_some_and(|config| config.is_dir);
562
563        let mut entries = Vec::new();
564        let mut entry_info = Vec::new();
565        let mut response = fs.read_dir(&expanded).await?;
566        while let Some(path) = response.next().await {
567            let path = path?;
568            if let Some(file_name) = path.file_name() {
569                entries.push(file_name.to_string_lossy().to_string());
570                if check_info {
571                    let is_dir = fs.is_dir(&path).await;
572                    entry_info.push(proto::EntryInfo { is_dir });
573                }
574            }
575        }
576        Ok(proto::ListRemoteDirectoryResponse {
577            entries,
578            entry_info,
579        })
580    }
581
582    pub async fn handle_get_path_metadata(
583        this: Entity<Self>,
584        envelope: TypedEnvelope<proto::GetPathMetadata>,
585        cx: AsyncApp,
586    ) -> Result<proto::GetPathMetadataResponse> {
587        let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
588        let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
589
590        let metadata = fs.metadata(&expanded).await?;
591        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
592
593        Ok(proto::GetPathMetadataResponse {
594            exists: metadata.is_some(),
595            is_dir,
596            path: expanded.to_proto(),
597        })
598    }
599
600    pub async fn handle_shutdown_remote_server(
601        _this: Entity<Self>,
602        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
603        cx: AsyncApp,
604    ) -> Result<proto::Ack> {
605        cx.spawn(|cx| async move {
606            cx.update(|cx| {
607                // TODO: This is a hack, because in a headless project, shutdown isn't executed
608                // when calling quit, but it should be.
609                cx.shutdown();
610                cx.quit();
611            })
612        })
613        .detach();
614
615        Ok(proto::Ack {})
616    }
617
618    pub async fn handle_ping(
619        _this: Entity<Self>,
620        _envelope: TypedEnvelope<proto::Ping>,
621        _cx: AsyncApp,
622    ) -> Result<proto::Ack> {
623        log::debug!("Received ping from client");
624        Ok(proto::Ack {})
625    }
626}
627
628fn prompt_to_proto(
629    prompt: &project::LanguageServerPromptRequest,
630) -> proto::language_server_prompt_request::Level {
631    match prompt.level {
632        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
633            proto::language_server_prompt_request::Info {},
634        ),
635        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
636            proto::language_server_prompt_request::Warning {},
637        ),
638        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
639            proto::language_server_prompt_request::Critical {},
640        ),
641    }
642}