headless_project.rs

  1use ::proto::{FromProto, ToProto};
  2use anyhow::{anyhow, Result};
  3use extension::ExtensionHostProxy;
  4use extension_host::headless_host::HeadlessExtensionStore;
  5use fs::Fs;
  6use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  7use http_client::HttpClient;
  8use language::{proto::serialize_operation, Buffer, BufferEvent, LanguageRegistry};
  9use node_runtime::NodeRuntime;
 10use project::{
 11    buffer_store::{BufferStore, BufferStoreEvent},
 12    git::GitStore,
 13    project_settings::SettingsObserver,
 14    search::SearchQuery,
 15    task_store::TaskStore,
 16    worktree_store::WorktreeStore,
 17    LspStore, LspStoreEvent, PrettierStore, ProjectPath, ToolchainStore, WorktreeId,
 18};
 19use remote::ssh_session::ChannelClient;
 20use rpc::{
 21    proto::{self, SSH_PEER_ID, SSH_PROJECT_ID},
 22    AnyProtoClient, TypedEnvelope,
 23};
 24
 25use settings::initial_server_settings_content;
 26use smol::stream::StreamExt;
 27use std::{
 28    path::{Path, PathBuf},
 29    sync::{atomic::AtomicUsize, Arc},
 30};
 31use util::ResultExt;
 32use worktree::Worktree;
 33
 34pub struct HeadlessProject {
 35    pub fs: Arc<dyn Fs>,
 36    pub session: AnyProtoClient,
 37    pub worktree_store: Entity<WorktreeStore>,
 38    pub buffer_store: Entity<BufferStore>,
 39    pub lsp_store: Entity<LspStore>,
 40    pub task_store: Entity<TaskStore>,
 41    pub settings_observer: Entity<SettingsObserver>,
 42    pub next_entry_id: Arc<AtomicUsize>,
 43    pub languages: Arc<LanguageRegistry>,
 44    pub extensions: Entity<HeadlessExtensionStore>,
 45    pub git_store: Entity<GitStore>,
 46}
 47
 48pub struct HeadlessAppState {
 49    pub session: Arc<ChannelClient>,
 50    pub fs: Arc<dyn Fs>,
 51    pub http_client: Arc<dyn HttpClient>,
 52    pub node_runtime: NodeRuntime,
 53    pub languages: Arc<LanguageRegistry>,
 54    pub extension_host_proxy: Arc<ExtensionHostProxy>,
 55}
 56
 57impl HeadlessProject {
 58    pub fn init(cx: &mut App) {
 59        settings::init(cx);
 60        language::init(cx);
 61        project::Project::init_settings(cx);
 62    }
 63
 64    pub fn new(
 65        HeadlessAppState {
 66            session,
 67            fs,
 68            http_client,
 69            node_runtime,
 70            languages,
 71            extension_host_proxy: proxy,
 72        }: HeadlessAppState,
 73        cx: &mut Context<Self>,
 74    ) -> Self {
 75        language_extension::init(proxy.clone(), languages.clone());
 76        languages::init(languages.clone(), node_runtime.clone(), cx);
 77
 78        let worktree_store = cx.new(|cx| {
 79            let mut store = WorktreeStore::local(true, fs.clone());
 80            store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 81            store
 82        });
 83
 84        let buffer_store = cx.new(|cx| {
 85            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
 86            buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
 87            buffer_store
 88        });
 89
 90        let git_store =
 91            cx.new(|cx| GitStore::new(&worktree_store, buffer_store.clone(), None, None, cx));
 92        let prettier_store = cx.new(|cx| {
 93            PrettierStore::new(
 94                node_runtime.clone(),
 95                fs.clone(),
 96                languages.clone(),
 97                worktree_store.clone(),
 98                cx,
 99            )
100        });
101        let environment = project::ProjectEnvironment::new(&worktree_store, None, cx);
102        let toolchain_store = cx.new(|cx| {
103            ToolchainStore::local(
104                languages.clone(),
105                worktree_store.clone(),
106                environment.clone(),
107                cx,
108            )
109        });
110
111        let task_store = cx.new(|cx| {
112            let mut task_store = TaskStore::local(
113                fs.clone(),
114                buffer_store.downgrade(),
115                worktree_store.clone(),
116                toolchain_store.read(cx).as_language_toolchain_store(),
117                environment.clone(),
118                cx,
119            );
120            task_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
121            task_store
122        });
123        let settings_observer = cx.new(|cx| {
124            let mut observer = SettingsObserver::new_local(
125                fs.clone(),
126                worktree_store.clone(),
127                task_store.clone(),
128                cx,
129            );
130            observer.shared(SSH_PROJECT_ID, session.clone().into(), cx);
131            observer
132        });
133
134        let lsp_store = cx.new(|cx| {
135            let mut lsp_store = LspStore::new_local(
136                buffer_store.clone(),
137                worktree_store.clone(),
138                prettier_store.clone(),
139                toolchain_store.clone(),
140                environment,
141                languages.clone(),
142                http_client.clone(),
143                fs.clone(),
144                cx,
145            );
146            lsp_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
147            lsp_store
148        });
149
150        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
151
152        cx.subscribe(
153            &buffer_store,
154            |_this, _buffer_store, event, cx| match event {
155                BufferStoreEvent::BufferAdded(buffer) => {
156                    cx.subscribe(buffer, Self::on_buffer_event).detach();
157                }
158                _ => {}
159            },
160        )
161        .detach();
162
163        let extensions = HeadlessExtensionStore::new(
164            fs.clone(),
165            http_client.clone(),
166            paths::remote_extensions_dir().to_path_buf(),
167            proxy,
168            node_runtime,
169            cx,
170        );
171
172        let client: AnyProtoClient = session.clone().into();
173
174        // local_machine -> ssh handlers
175        session.subscribe_to_entity(SSH_PROJECT_ID, &worktree_store);
176        session.subscribe_to_entity(SSH_PROJECT_ID, &buffer_store);
177        session.subscribe_to_entity(SSH_PROJECT_ID, &cx.entity());
178        session.subscribe_to_entity(SSH_PROJECT_ID, &lsp_store);
179        session.subscribe_to_entity(SSH_PROJECT_ID, &task_store);
180        session.subscribe_to_entity(SSH_PROJECT_ID, &toolchain_store);
181        session.subscribe_to_entity(SSH_PROJECT_ID, &settings_observer);
182        session.subscribe_to_entity(SSH_PROJECT_ID, &git_store);
183
184        client.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
185        client.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
186        client.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
187        client.add_request_handler(cx.weak_entity(), Self::handle_ping);
188
189        client.add_entity_request_handler(Self::handle_add_worktree);
190        client.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
191
192        client.add_entity_request_handler(Self::handle_open_buffer_by_path);
193        client.add_entity_request_handler(Self::handle_open_new_buffer);
194        client.add_entity_request_handler(Self::handle_find_search_candidates);
195        client.add_entity_request_handler(Self::handle_open_server_settings);
196
197        client.add_entity_request_handler(BufferStore::handle_update_buffer);
198        client.add_entity_message_handler(BufferStore::handle_close_buffer);
199
200        client.add_request_handler(
201            extensions.clone().downgrade(),
202            HeadlessExtensionStore::handle_sync_extensions,
203        );
204        client.add_request_handler(
205            extensions.clone().downgrade(),
206            HeadlessExtensionStore::handle_install_extension,
207        );
208
209        BufferStore::init(&client);
210        WorktreeStore::init(&client);
211        SettingsObserver::init(&client);
212        LspStore::init(&client);
213        TaskStore::init(Some(&client));
214        ToolchainStore::init(&client);
215        GitStore::init(&client);
216
217        HeadlessProject {
218            session: client,
219            settings_observer,
220            fs,
221            worktree_store,
222            buffer_store,
223            lsp_store,
224            task_store,
225            next_entry_id: Default::default(),
226            languages,
227            extensions,
228            git_store,
229        }
230    }
231
232    fn on_buffer_event(
233        &mut self,
234        buffer: Entity<Buffer>,
235        event: &BufferEvent,
236        cx: &mut Context<Self>,
237    ) {
238        match event {
239            BufferEvent::Operation {
240                operation,
241                is_local: true,
242            } => cx
243                .background_executor()
244                .spawn(self.session.request(proto::UpdateBuffer {
245                    project_id: SSH_PROJECT_ID,
246                    buffer_id: buffer.read(cx).remote_id().to_proto(),
247                    operations: vec![serialize_operation(operation)],
248                }))
249                .detach(),
250            _ => {}
251        }
252    }
253
254    fn on_lsp_store_event(
255        &mut self,
256        _lsp_store: Entity<LspStore>,
257        event: &LspStoreEvent,
258        cx: &mut Context<Self>,
259    ) {
260        match event {
261            LspStoreEvent::LanguageServerUpdate {
262                language_server_id,
263                message,
264            } => {
265                self.session
266                    .send(proto::UpdateLanguageServer {
267                        project_id: SSH_PROJECT_ID,
268                        language_server_id: language_server_id.to_proto(),
269                        variant: Some(message.clone()),
270                    })
271                    .log_err();
272            }
273            LspStoreEvent::Notification(message) => {
274                self.session
275                    .send(proto::Toast {
276                        project_id: SSH_PROJECT_ID,
277                        notification_id: "lsp".to_string(),
278                        message: message.clone(),
279                    })
280                    .log_err();
281            }
282            LspStoreEvent::LanguageServerLog(language_server_id, log_type, message) => {
283                self.session
284                    .send(proto::LanguageServerLog {
285                        project_id: SSH_PROJECT_ID,
286                        language_server_id: language_server_id.to_proto(),
287                        message: message.clone(),
288                        log_type: Some(log_type.to_proto()),
289                    })
290                    .log_err();
291            }
292            LspStoreEvent::LanguageServerPrompt(prompt) => {
293                let request = self.session.request(proto::LanguageServerPromptRequest {
294                    project_id: SSH_PROJECT_ID,
295                    actions: prompt
296                        .actions
297                        .iter()
298                        .map(|action| action.title.to_string())
299                        .collect(),
300                    level: Some(prompt_to_proto(&prompt)),
301                    lsp_name: prompt.lsp_name.clone(),
302                    message: prompt.message.clone(),
303                });
304                let prompt = prompt.clone();
305                cx.background_executor()
306                    .spawn(async move {
307                        let response = request.await?;
308                        if let Some(action_response) = response.action_response {
309                            prompt.respond(action_response as usize).await;
310                        }
311                        anyhow::Ok(())
312                    })
313                    .detach();
314            }
315            _ => {}
316        }
317    }
318
319    pub async fn handle_add_worktree(
320        this: Entity<Self>,
321        message: TypedEnvelope<proto::AddWorktree>,
322        mut cx: AsyncApp,
323    ) -> Result<proto::AddWorktreeResponse> {
324        use client::ErrorCodeExt;
325        let fs = this.read_with(&mut cx, |this, _| this.fs.clone())?;
326        let path = PathBuf::from_proto(shellexpand::tilde(&message.payload.path).to_string());
327
328        let canonicalized = match fs.canonicalize(&path).await {
329            Ok(path) => path,
330            Err(e) => {
331                let mut parent = path
332                    .parent()
333                    .ok_or(e)
334                    .map_err(|_| anyhow!("{:?} does not exist", path))?;
335                if parent == Path::new("") {
336                    parent = util::paths::home_dir();
337                }
338                let parent = fs.canonicalize(parent).await.map_err(|_| {
339                    anyhow!(proto::ErrorCode::DevServerProjectPathDoesNotExist
340                        .with_tag("path", &path.to_string_lossy().as_ref()))
341                })?;
342                parent.join(path.file_name().unwrap())
343            }
344        };
345
346        let worktree = this
347            .update(&mut cx.clone(), |this, _| {
348                Worktree::local(
349                    Arc::from(canonicalized.as_path()),
350                    message.payload.visible,
351                    this.fs.clone(),
352                    this.next_entry_id.clone(),
353                    &mut cx,
354                )
355            })?
356            .await?;
357
358        let response = this.update(&mut cx, |_, cx| {
359            worktree.update(cx, |worktree, _| proto::AddWorktreeResponse {
360                worktree_id: worktree.id().to_proto(),
361                canonicalized_path: canonicalized.to_proto(),
362            })
363        })?;
364
365        // We spawn this asynchronously, so that we can send the response back
366        // *before* `worktree_store.add()` can send out UpdateProject requests
367        // to the client about the new worktree.
368        //
369        // That lets the client manage the reference/handles of the newly-added
370        // worktree, before getting interrupted by an UpdateProject request.
371        //
372        // This fixes the problem of the client sending the AddWorktree request,
373        // headless project sending out a project update, client receiving it
374        // and immediately dropping the reference of the new client, causing it
375        // to be dropped on the headless project, and the client only then
376        // receiving a response to AddWorktree.
377        cx.spawn(|mut cx| async move {
378            this.update(&mut cx, |this, cx| {
379                this.worktree_store.update(cx, |worktree_store, cx| {
380                    worktree_store.add(&worktree, cx);
381                });
382            })
383            .log_err();
384        })
385        .detach();
386
387        Ok(response)
388    }
389
390    pub async fn handle_remove_worktree(
391        this: Entity<Self>,
392        envelope: TypedEnvelope<proto::RemoveWorktree>,
393        mut cx: AsyncApp,
394    ) -> Result<proto::Ack> {
395        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
396        this.update(&mut cx, |this, cx| {
397            this.worktree_store.update(cx, |worktree_store, cx| {
398                worktree_store.remove_worktree(worktree_id, cx);
399            });
400        })?;
401        Ok(proto::Ack {})
402    }
403
404    pub async fn handle_open_buffer_by_path(
405        this: Entity<Self>,
406        message: TypedEnvelope<proto::OpenBufferByPath>,
407        mut cx: AsyncApp,
408    ) -> Result<proto::OpenBufferResponse> {
409        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
410        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
411            let buffer_store = this.buffer_store.clone();
412            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
413                buffer_store.open_buffer(
414                    ProjectPath {
415                        worktree_id,
416                        path: Arc::<Path>::from_proto(message.payload.path),
417                    },
418                    cx,
419                )
420            });
421            anyhow::Ok((buffer_store, buffer))
422        })??;
423
424        let buffer = buffer.await?;
425        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
426        buffer_store.update(&mut cx, |buffer_store, cx| {
427            buffer_store
428                .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
429                .detach_and_log_err(cx);
430        })?;
431
432        Ok(proto::OpenBufferResponse {
433            buffer_id: buffer_id.to_proto(),
434        })
435    }
436
437    pub async fn handle_open_new_buffer(
438        this: Entity<Self>,
439        _message: TypedEnvelope<proto::OpenNewBuffer>,
440        mut cx: AsyncApp,
441    ) -> Result<proto::OpenBufferResponse> {
442        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
443            let buffer_store = this.buffer_store.clone();
444            let buffer = this
445                .buffer_store
446                .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx));
447            anyhow::Ok((buffer_store, buffer))
448        })??;
449
450        let buffer = buffer.await?;
451        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
452        buffer_store.update(&mut cx, |buffer_store, cx| {
453            buffer_store
454                .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
455                .detach_and_log_err(cx);
456        })?;
457
458        Ok(proto::OpenBufferResponse {
459            buffer_id: buffer_id.to_proto(),
460        })
461    }
462
463    pub async fn handle_open_server_settings(
464        this: Entity<Self>,
465        _: TypedEnvelope<proto::OpenServerSettings>,
466        mut cx: AsyncApp,
467    ) -> Result<proto::OpenBufferResponse> {
468        let settings_path = paths::settings_file();
469        let (worktree, path) = this
470            .update(&mut cx, |this, cx| {
471                this.worktree_store.update(cx, |worktree_store, cx| {
472                    worktree_store.find_or_create_worktree(settings_path, false, cx)
473                })
474            })?
475            .await?;
476
477        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
478            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
479                buffer_store.open_buffer(
480                    ProjectPath {
481                        worktree_id: worktree.read(cx).id(),
482                        path: path.into(),
483                    },
484                    cx,
485                )
486            });
487
488            (buffer, this.buffer_store.clone())
489        })?;
490
491        let buffer = buffer.await?;
492
493        let buffer_id = cx.update(|cx| {
494            if buffer.read(cx).is_empty() {
495                buffer.update(cx, |buffer, cx| {
496                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
497                });
498            }
499
500            let buffer_id = buffer.read_with(cx, |b, _| b.remote_id());
501
502            buffer_store.update(cx, |buffer_store, cx| {
503                buffer_store
504                    .create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
505                    .detach_and_log_err(cx);
506            });
507
508            buffer_id
509        })?;
510
511        Ok(proto::OpenBufferResponse {
512            buffer_id: buffer_id.to_proto(),
513        })
514    }
515
516    pub async fn handle_find_search_candidates(
517        this: Entity<Self>,
518        envelope: TypedEnvelope<proto::FindSearchCandidates>,
519        mut cx: AsyncApp,
520    ) -> Result<proto::FindSearchCandidatesResponse> {
521        let message = envelope.payload;
522        let query = SearchQuery::from_proto(
523            message
524                .query
525                .ok_or_else(|| anyhow!("missing query field"))?,
526        )?;
527        let results = this.update(&mut cx, |this, cx| {
528            this.buffer_store.update(cx, |buffer_store, cx| {
529                buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
530            })
531        })?;
532
533        let mut response = proto::FindSearchCandidatesResponse {
534            buffer_ids: Vec::new(),
535        };
536
537        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
538
539        while let Ok(buffer) = results.recv().await {
540            let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
541            response.buffer_ids.push(buffer_id.to_proto());
542            buffer_store
543                .update(&mut cx, |buffer_store, cx| {
544                    buffer_store.create_buffer_for_peer(&buffer, SSH_PEER_ID, cx)
545                })?
546                .await?;
547        }
548
549        Ok(response)
550    }
551
552    pub async fn handle_list_remote_directory(
553        this: Entity<Self>,
554        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
555        cx: AsyncApp,
556    ) -> Result<proto::ListRemoteDirectoryResponse> {
557        let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
558        let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
559
560        let mut entries = Vec::new();
561        let mut response = fs.read_dir(&expanded).await?;
562        while let Some(path) = response.next().await {
563            if let Some(file_name) = path?.file_name() {
564                entries.push(file_name.to_string_lossy().to_string());
565            }
566        }
567        Ok(proto::ListRemoteDirectoryResponse { entries })
568    }
569
570    pub async fn handle_get_path_metadata(
571        this: Entity<Self>,
572        envelope: TypedEnvelope<proto::GetPathMetadata>,
573        cx: AsyncApp,
574    ) -> Result<proto::GetPathMetadataResponse> {
575        let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
576        let expanded = PathBuf::from_proto(shellexpand::tilde(&envelope.payload.path).to_string());
577
578        let metadata = fs.metadata(&expanded).await?;
579        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
580
581        Ok(proto::GetPathMetadataResponse {
582            exists: metadata.is_some(),
583            is_dir,
584            path: expanded.to_proto(),
585        })
586    }
587
588    pub async fn handle_shutdown_remote_server(
589        _this: Entity<Self>,
590        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
591        cx: AsyncApp,
592    ) -> Result<proto::Ack> {
593        cx.spawn(|cx| async move {
594            cx.update(|cx| {
595                // TODO: This is a hack, because in a headless project, shutdown isn't executed
596                // when calling quit, but it should be.
597                cx.shutdown();
598                cx.quit();
599            })
600        })
601        .detach();
602
603        Ok(proto::Ack {})
604    }
605
606    pub async fn handle_ping(
607        _this: Entity<Self>,
608        _envelope: TypedEnvelope<proto::Ping>,
609        _cx: AsyncApp,
610    ) -> Result<proto::Ack> {
611        log::debug!("Received ping from client");
612        Ok(proto::Ack {})
613    }
614}
615
616fn prompt_to_proto(
617    prompt: &project::LanguageServerPromptRequest,
618) -> proto::language_server_prompt_request::Level {
619    match prompt.level {
620        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
621            proto::language_server_prompt_request::Info {},
622        ),
623        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
624            proto::language_server_prompt_request::Warning {},
625        ),
626        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
627            proto::language_server_prompt_request::Critical {},
628        ),
629    }
630}