headless_project.rs

  1use anyhow::{Context as _, Result, anyhow};
  2use language::File;
  3use lsp::LanguageServerId;
  4
  5use extension::ExtensionHostProxy;
  6use extension_host::headless_host::HeadlessExtensionStore;
  7use fs::Fs;
  8use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  9use http_client::HttpClient;
 10use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
 11use node_runtime::NodeRuntime;
 12use project::{
 13    LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
 14    ToolchainStore, WorktreeId,
 15    agent_server_store::AgentServerStore,
 16    buffer_store::{BufferStore, BufferStoreEvent},
 17    debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
 18    git_store::GitStore,
 19    image_store::ImageId,
 20    lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind},
 21    project_settings::SettingsObserver,
 22    search::SearchQuery,
 23    task_store::TaskStore,
 24    worktree_store::WorktreeStore,
 25};
 26use rpc::{
 27    AnyProtoClient, TypedEnvelope,
 28    proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
 29};
 30
 31use settings::{Settings as _, initial_server_settings_content};
 32use smol::stream::StreamExt;
 33use std::{
 34    num::NonZeroU64,
 35    path::{Path, PathBuf},
 36    sync::{
 37        Arc,
 38        atomic::{AtomicU64, AtomicUsize, Ordering},
 39    },
 40};
 41use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
 42use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
 43use worktree::Worktree;
 44
 45pub struct HeadlessProject {
 46    pub fs: Arc<dyn Fs>,
 47    pub session: AnyProtoClient,
 48    pub worktree_store: Entity<WorktreeStore>,
 49    pub buffer_store: Entity<BufferStore>,
 50    pub lsp_store: Entity<LspStore>,
 51    pub task_store: Entity<TaskStore>,
 52    pub dap_store: Entity<DapStore>,
 53    pub agent_server_store: Entity<AgentServerStore>,
 54    pub settings_observer: Entity<SettingsObserver>,
 55    pub next_entry_id: Arc<AtomicUsize>,
 56    pub languages: Arc<LanguageRegistry>,
 57    pub extensions: Entity<HeadlessExtensionStore>,
 58    pub git_store: Entity<GitStore>,
 59    pub environment: Entity<ProjectEnvironment>,
 60    // Used mostly to keep alive the toolchain store for RPC handlers.
 61    // Local variant is used within LSP store, but that's a separate entity.
 62    pub _toolchain_store: Entity<ToolchainStore>,
 63}
 64
 65pub struct HeadlessAppState {
 66    pub session: AnyProtoClient,
 67    pub fs: Arc<dyn Fs>,
 68    pub http_client: Arc<dyn HttpClient>,
 69    pub node_runtime: NodeRuntime,
 70    pub languages: Arc<LanguageRegistry>,
 71    pub extension_host_proxy: Arc<ExtensionHostProxy>,
 72}
 73
 74impl HeadlessProject {
 75    pub fn init(cx: &mut App) {
 76        settings::init(cx);
 77        language::init(cx);
 78        project::Project::init_settings(cx);
 79        extension_host::ExtensionSettings::register(cx);
 80        log_store::init(true, cx);
 81    }
 82
 83    pub fn new(
 84        HeadlessAppState {
 85            session,
 86            fs,
 87            http_client,
 88            node_runtime,
 89            languages,
 90            extension_host_proxy: proxy,
 91        }: HeadlessAppState,
 92        cx: &mut Context<Self>,
 93    ) -> Self {
 94        debug_adapter_extension::init(proxy.clone(), cx);
 95        languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
 96
 97        let worktree_store = cx.new(|cx| {
 98            let mut store = WorktreeStore::local(true, fs.clone());
 99            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
100            store
101        });
102
103        let environment =
104            cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
105        let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
106        let toolchain_store = cx.new(|cx| {
107            ToolchainStore::local(
108                languages.clone(),
109                worktree_store.clone(),
110                environment.clone(),
111                manifest_tree.clone(),
112                fs.clone(),
113                cx,
114            )
115        });
116
117        let buffer_store = cx.new(|cx| {
118            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
119            buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
120            buffer_store
121        });
122
123        let breakpoint_store =
124            cx.new(|_| BreakpointStore::local(worktree_store.clone(), buffer_store.clone()));
125
126        let dap_store = cx.new(|cx| {
127            let mut dap_store = DapStore::new_local(
128                http_client.clone(),
129                node_runtime.clone(),
130                fs.clone(),
131                environment.clone(),
132                toolchain_store.read(cx).as_language_toolchain_store(),
133                worktree_store.clone(),
134                breakpoint_store.clone(),
135                true,
136                cx,
137            );
138            dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
139            dap_store
140        });
141
142        let git_store = cx.new(|cx| {
143            let mut store = GitStore::local(
144                &worktree_store,
145                buffer_store.clone(),
146                environment.clone(),
147                fs.clone(),
148                cx,
149            );
150            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
151            store
152        });
153
154        let prettier_store = cx.new(|cx| {
155            PrettierStore::new(
156                node_runtime.clone(),
157                fs.clone(),
158                languages.clone(),
159                worktree_store.clone(),
160                cx,
161            )
162        });
163
164        let task_store = cx.new(|cx| {
165            let mut task_store = TaskStore::local(
166                buffer_store.downgrade(),
167                worktree_store.clone(),
168                toolchain_store.read(cx).as_language_toolchain_store(),
169                environment.clone(),
170                cx,
171            );
172            task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
173            task_store
174        });
175        let settings_observer = cx.new(|cx| {
176            let mut observer = SettingsObserver::new_local(
177                fs.clone(),
178                worktree_store.clone(),
179                task_store.clone(),
180                cx,
181            );
182            observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
183            observer
184        });
185
186        let lsp_store = cx.new(|cx| {
187            let mut lsp_store = LspStore::new_local(
188                buffer_store.clone(),
189                worktree_store.clone(),
190                prettier_store.clone(),
191                toolchain_store
192                    .read(cx)
193                    .as_local_store()
194                    .expect("Toolchain store to be local")
195                    .clone(),
196                environment.clone(),
197                manifest_tree,
198                languages.clone(),
199                http_client.clone(),
200                fs.clone(),
201                cx,
202            );
203            lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
204            lsp_store
205        });
206
207        let agent_server_store = cx.new(|cx| {
208            let mut agent_server_store = AgentServerStore::local(
209                node_runtime.clone(),
210                fs.clone(),
211                environment.clone(),
212                http_client.clone(),
213                cx,
214            );
215            agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
216            agent_server_store
217        });
218
219        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
220        language_extension::init(
221            language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
222            proxy.clone(),
223            languages.clone(),
224        );
225
226        cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
227            if let BufferStoreEvent::BufferAdded(buffer) = event {
228                cx.subscribe(buffer, Self::on_buffer_event).detach();
229            }
230        })
231        .detach();
232
233        let extensions = HeadlessExtensionStore::new(
234            fs.clone(),
235            http_client.clone(),
236            paths::remote_extensions_dir().to_path_buf(),
237            proxy,
238            node_runtime,
239            cx,
240        );
241
242        // local_machine -> ssh handlers
243        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
244        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
245        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
246        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
247        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
248        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
249        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
250        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
251        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
252        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
253
254        session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
255        session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
256        session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
257        session.add_request_handler(cx.weak_entity(), Self::handle_ping);
258        session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
259
260        session.add_entity_request_handler(Self::handle_add_worktree);
261        session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
262
263        session.add_entity_request_handler(Self::handle_open_buffer_by_path);
264        session.add_entity_request_handler(Self::handle_open_new_buffer);
265        session.add_entity_request_handler(Self::handle_find_search_candidates);
266        session.add_entity_request_handler(Self::handle_open_server_settings);
267        session.add_entity_request_handler(Self::handle_get_directory_environment);
268        session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
269        session.add_entity_request_handler(Self::handle_open_image_by_path);
270
271        session.add_entity_request_handler(BufferStore::handle_update_buffer);
272        session.add_entity_message_handler(BufferStore::handle_close_buffer);
273
274        session.add_request_handler(
275            extensions.downgrade(),
276            HeadlessExtensionStore::handle_sync_extensions,
277        );
278        session.add_request_handler(
279            extensions.downgrade(),
280            HeadlessExtensionStore::handle_install_extension,
281        );
282
283        BufferStore::init(&session);
284        WorktreeStore::init(&session);
285        SettingsObserver::init(&session);
286        LspStore::init(&session);
287        TaskStore::init(Some(&session));
288        ToolchainStore::init(&session);
289        DapStore::init(&session, cx);
290        // todo(debugger): Re init breakpoint store when we set it up for collab
291        // BreakpointStore::init(&client);
292        GitStore::init(&session);
293        AgentServerStore::init_headless(&session);
294
295        HeadlessProject {
296            next_entry_id: Default::default(),
297            session,
298            settings_observer,
299            fs,
300            worktree_store,
301            buffer_store,
302            lsp_store,
303            task_store,
304            dap_store,
305            agent_server_store,
306            languages,
307            extensions,
308            git_store,
309            environment,
310            _toolchain_store: toolchain_store,
311        }
312    }
313
314    fn on_buffer_event(
315        &mut self,
316        buffer: Entity<Buffer>,
317        event: &BufferEvent,
318        cx: &mut Context<Self>,
319    ) {
320        if let BufferEvent::Operation {
321            operation,
322            is_local: true,
323        } = event
324        {
325            cx.background_spawn(self.session.request(proto::UpdateBuffer {
326                project_id: REMOTE_SERVER_PROJECT_ID,
327                buffer_id: buffer.read(cx).remote_id().to_proto(),
328                operations: vec![serialize_operation(operation)],
329            }))
330            .detach()
331        }
332    }
333
334    fn on_lsp_store_event(
335        &mut self,
336        lsp_store: Entity<LspStore>,
337        event: &LspStoreEvent,
338        cx: &mut Context<Self>,
339    ) {
340        match event {
341            LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
342                let log_store = cx
343                    .try_global::<GlobalLogStore>()
344                    .map(|lsp_logs| lsp_logs.0.clone());
345                if let Some(log_store) = log_store {
346                    log_store.update(cx, |log_store, cx| {
347                        log_store.add_language_server(
348                            LanguageServerKind::LocalSsh {
349                                lsp_store: self.lsp_store.downgrade(),
350                            },
351                            *id,
352                            Some(name.clone()),
353                            *worktree_id,
354                            lsp_store.read(cx).language_server_for_id(*id),
355                            cx,
356                        );
357                    });
358                }
359            }
360            LspStoreEvent::LanguageServerRemoved(id) => {
361                let log_store = cx
362                    .try_global::<GlobalLogStore>()
363                    .map(|lsp_logs| lsp_logs.0.clone());
364                if let Some(log_store) = log_store {
365                    log_store.update(cx, |log_store, cx| {
366                        log_store.remove_language_server(*id, cx);
367                    });
368                }
369            }
370            LspStoreEvent::LanguageServerUpdate {
371                language_server_id,
372                name,
373                message,
374            } => {
375                self.session
376                    .send(proto::UpdateLanguageServer {
377                        project_id: REMOTE_SERVER_PROJECT_ID,
378                        server_name: name.as_ref().map(|name| name.to_string()),
379                        language_server_id: language_server_id.to_proto(),
380                        variant: Some(message.clone()),
381                    })
382                    .log_err();
383            }
384            LspStoreEvent::Notification(message) => {
385                self.session
386                    .send(proto::Toast {
387                        project_id: REMOTE_SERVER_PROJECT_ID,
388                        notification_id: "lsp".to_string(),
389                        message: message.clone(),
390                    })
391                    .log_err();
392            }
393            LspStoreEvent::LanguageServerPrompt(prompt) => {
394                let request = self.session.request(proto::LanguageServerPromptRequest {
395                    project_id: REMOTE_SERVER_PROJECT_ID,
396                    actions: prompt
397                        .actions
398                        .iter()
399                        .map(|action| action.title.to_string())
400                        .collect(),
401                    level: Some(prompt_to_proto(prompt)),
402                    lsp_name: prompt.lsp_name.clone(),
403                    message: prompt.message.clone(),
404                });
405                let prompt = prompt.clone();
406                cx.background_spawn(async move {
407                    let response = request.await?;
408                    if let Some(action_response) = response.action_response {
409                        prompt.respond(action_response as usize).await;
410                    }
411                    anyhow::Ok(())
412                })
413                .detach();
414            }
415            _ => {}
416        }
417    }
418
419    pub async fn handle_add_worktree(
420        this: Entity<Self>,
421        message: TypedEnvelope<proto::AddWorktree>,
422        mut cx: AsyncApp,
423    ) -> Result<proto::AddWorktreeResponse> {
424        use client::ErrorCodeExt;
425        let fs = this.read_with(&cx, |this, _| this.fs.clone())?;
426        let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
427
428        let canonicalized = match fs.canonicalize(&path).await {
429            Ok(path) => path,
430            Err(e) => {
431                let mut parent = path
432                    .parent()
433                    .ok_or(e)
434                    .with_context(|| format!("{path:?} does not exist"))?;
435                if parent == Path::new("") {
436                    parent = util::paths::home_dir();
437                }
438                let parent = fs.canonicalize(parent).await.map_err(|_| {
439                    anyhow!(
440                        proto::ErrorCode::DevServerProjectPathDoesNotExist
441                            .with_tag("path", path.to_string_lossy().as_ref())
442                    )
443                })?;
444                parent.join(path.file_name().unwrap())
445            }
446        };
447
448        let worktree = this
449            .read_with(&cx.clone(), |this, _| {
450                Worktree::local(
451                    Arc::from(canonicalized.as_path()),
452                    message.payload.visible,
453                    this.fs.clone(),
454                    this.next_entry_id.clone(),
455                    &mut cx,
456                )
457            })?
458            .await?;
459
460        let response = this.read_with(&cx, |_, cx| {
461            let worktree = worktree.read(cx);
462            proto::AddWorktreeResponse {
463                worktree_id: worktree.id().to_proto(),
464                canonicalized_path: canonicalized.to_string_lossy().into_owned(),
465            }
466        })?;
467
468        // We spawn this asynchronously, so that we can send the response back
469        // *before* `worktree_store.add()` can send out UpdateProject requests
470        // to the client about the new worktree.
471        //
472        // That lets the client manage the reference/handles of the newly-added
473        // worktree, before getting interrupted by an UpdateProject request.
474        //
475        // This fixes the problem of the client sending the AddWorktree request,
476        // headless project sending out a project update, client receiving it
477        // and immediately dropping the reference of the new client, causing it
478        // to be dropped on the headless project, and the client only then
479        // receiving a response to AddWorktree.
480        cx.spawn(async move |cx| {
481            this.update(cx, |this, cx| {
482                this.worktree_store.update(cx, |worktree_store, cx| {
483                    worktree_store.add(&worktree, cx);
484                });
485            })
486            .log_err();
487        })
488        .detach();
489
490        Ok(response)
491    }
492
493    pub async fn handle_remove_worktree(
494        this: Entity<Self>,
495        envelope: TypedEnvelope<proto::RemoveWorktree>,
496        mut cx: AsyncApp,
497    ) -> Result<proto::Ack> {
498        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
499        this.update(&mut cx, |this, cx| {
500            this.worktree_store.update(cx, |worktree_store, cx| {
501                worktree_store.remove_worktree(worktree_id, cx);
502            });
503        })?;
504        Ok(proto::Ack {})
505    }
506
507    pub async fn handle_open_buffer_by_path(
508        this: Entity<Self>,
509        message: TypedEnvelope<proto::OpenBufferByPath>,
510        mut cx: AsyncApp,
511    ) -> Result<proto::OpenBufferResponse> {
512        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
513        let path = RelPath::from_proto(&message.payload.path)?;
514        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
515            let buffer_store = this.buffer_store.clone();
516            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
517                buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
518            });
519            anyhow::Ok((buffer_store, buffer))
520        })??;
521
522        let buffer = buffer.await?;
523        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
524        buffer_store.update(&mut cx, |buffer_store, cx| {
525            buffer_store
526                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
527                .detach_and_log_err(cx);
528        })?;
529
530        Ok(proto::OpenBufferResponse {
531            buffer_id: buffer_id.to_proto(),
532        })
533    }
534
535    pub async fn handle_open_image_by_path(
536        this: Entity<Self>,
537        message: TypedEnvelope<proto::OpenImageByPath>,
538        mut cx: AsyncApp,
539    ) -> Result<proto::OpenImageResponse> {
540        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
541        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
542        let path = RelPath::from_proto(&message.payload.path)?;
543        let project_id = message.payload.project_id;
544        use proto::create_image_for_peer::Variant;
545
546        let (worktree_store, session) = this.read_with(&cx, |this, _| {
547            (this.worktree_store.clone(), this.session.clone())
548        })?;
549
550        let worktree = worktree_store
551            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))?
552            .context("worktree not found")?;
553
554        let load_task = worktree.update(&mut cx, |worktree, cx| {
555            worktree.load_binary_file(path.as_ref(), cx)
556        })?;
557
558        let loaded_file = load_task.await?;
559        let content = loaded_file.content;
560        let file = loaded_file.file;
561
562        let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx))?;
563        let image_id =
564            ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
565
566        let format = image::guess_format(&content)
567            .map(|f| format!("{:?}", f).to_lowercase())
568            .unwrap_or_else(|_| "unknown".to_string());
569
570        let state = proto::ImageState {
571            id: image_id.to_proto(),
572            file: Some(proto_file),
573            content_size: content.len() as u64,
574            format,
575        };
576
577        session.send(proto::CreateImageForPeer {
578            project_id,
579            peer_id: Some(REMOTE_SERVER_PEER_ID),
580            variant: Some(Variant::State(state)),
581        })?;
582
583        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
584        for chunk in content.chunks(CHUNK_SIZE) {
585            session.send(proto::CreateImageForPeer {
586                project_id,
587                peer_id: Some(REMOTE_SERVER_PEER_ID),
588                variant: Some(Variant::Chunk(proto::ImageChunk {
589                    image_id: image_id.to_proto(),
590                    data: chunk.to_vec(),
591                })),
592            })?;
593        }
594
595        Ok(proto::OpenImageResponse {
596            image_id: image_id.to_proto(),
597        })
598    }
599
600    pub async fn handle_open_new_buffer(
601        this: Entity<Self>,
602        _message: TypedEnvelope<proto::OpenNewBuffer>,
603        mut cx: AsyncApp,
604    ) -> Result<proto::OpenBufferResponse> {
605        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
606            let buffer_store = this.buffer_store.clone();
607            let buffer = this
608                .buffer_store
609                .update(cx, |buffer_store, cx| buffer_store.create_buffer(true, cx));
610            anyhow::Ok((buffer_store, buffer))
611        })??;
612
613        let buffer = buffer.await?;
614        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
615        buffer_store.update(&mut cx, |buffer_store, cx| {
616            buffer_store
617                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
618                .detach_and_log_err(cx);
619        })?;
620
621        Ok(proto::OpenBufferResponse {
622            buffer_id: buffer_id.to_proto(),
623        })
624    }
625
626    async fn handle_toggle_lsp_logs(
627        _: Entity<Self>,
628        envelope: TypedEnvelope<proto::ToggleLspLogs>,
629        mut cx: AsyncApp,
630    ) -> Result<()> {
631        let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
632        let lsp_logs = cx
633            .update(|cx| {
634                cx.try_global::<GlobalLogStore>()
635                    .map(|lsp_logs| lsp_logs.0.clone())
636            })?
637            .context("lsp logs store is missing")?;
638
639        lsp_logs.update(&mut cx, |lsp_logs, _| {
640            // RPC logs are very noisy and we need to toggle it on the headless server too.
641            // The rest of the logs for the ssh project are very important to have toggled always,
642            // to e.g. send language server error logs to the client before anything is toggled.
643            if envelope.payload.enabled {
644                lsp_logs.enable_rpc_trace_for_language_server(server_id);
645            } else {
646                lsp_logs.disable_rpc_trace_for_language_server(server_id);
647            }
648        })?;
649        Ok(())
650    }
651
652    async fn handle_open_server_settings(
653        this: Entity<Self>,
654        _: TypedEnvelope<proto::OpenServerSettings>,
655        mut cx: AsyncApp,
656    ) -> Result<proto::OpenBufferResponse> {
657        let settings_path = paths::settings_file();
658        let (worktree, path) = this
659            .update(&mut cx, |this, cx| {
660                this.worktree_store.update(cx, |worktree_store, cx| {
661                    worktree_store.find_or_create_worktree(settings_path, false, cx)
662                })
663            })?
664            .await?;
665
666        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
667            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
668                buffer_store.open_buffer(
669                    ProjectPath {
670                        worktree_id: worktree.read(cx).id(),
671                        path: path,
672                    },
673                    cx,
674                )
675            });
676
677            (buffer, this.buffer_store.clone())
678        })?;
679
680        let buffer = buffer.await?;
681
682        let buffer_id = cx.update(|cx| {
683            if buffer.read(cx).is_empty() {
684                buffer.update(cx, |buffer, cx| {
685                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
686                });
687            }
688
689            let buffer_id = buffer.read(cx).remote_id();
690
691            buffer_store.update(cx, |buffer_store, cx| {
692                buffer_store
693                    .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
694                    .detach_and_log_err(cx);
695            });
696
697            buffer_id
698        })?;
699
700        Ok(proto::OpenBufferResponse {
701            buffer_id: buffer_id.to_proto(),
702        })
703    }
704
705    async fn handle_find_search_candidates(
706        this: Entity<Self>,
707        envelope: TypedEnvelope<proto::FindSearchCandidates>,
708        mut cx: AsyncApp,
709    ) -> Result<proto::FindSearchCandidatesResponse> {
710        let message = envelope.payload;
711        let query = SearchQuery::from_proto(
712            message.query.context("missing query field")?,
713            PathStyle::local(),
714        )?;
715        let results = this.update(&mut cx, |this, cx| {
716            this.buffer_store.update(cx, |buffer_store, cx| {
717                buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx)
718            })
719        })?;
720
721        let mut response = proto::FindSearchCandidatesResponse {
722            buffer_ids: Vec::new(),
723        };
724
725        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
726
727        while let Ok(buffer) = results.recv().await {
728            let buffer_id = buffer.read_with(&cx, |this, _| this.remote_id())?;
729            response.buffer_ids.push(buffer_id.to_proto());
730            buffer_store
731                .update(&mut cx, |buffer_store, cx| {
732                    buffer_store.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
733                })?
734                .await?;
735        }
736
737        Ok(response)
738    }
739
740    async fn handle_list_remote_directory(
741        this: Entity<Self>,
742        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
743        cx: AsyncApp,
744    ) -> Result<proto::ListRemoteDirectoryResponse> {
745        let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
746        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
747        let check_info = envelope
748            .payload
749            .config
750            .as_ref()
751            .is_some_and(|config| config.is_dir);
752
753        let mut entries = Vec::new();
754        let mut entry_info = Vec::new();
755        let mut response = fs.read_dir(&expanded).await?;
756        while let Some(path) = response.next().await {
757            let path = path?;
758            if let Some(file_name) = path.file_name() {
759                entries.push(file_name.to_string_lossy().into_owned());
760                if check_info {
761                    let is_dir = fs.is_dir(&path).await;
762                    entry_info.push(proto::EntryInfo { is_dir });
763                }
764            }
765        }
766        Ok(proto::ListRemoteDirectoryResponse {
767            entries,
768            entry_info,
769        })
770    }
771
772    async fn handle_get_path_metadata(
773        this: Entity<Self>,
774        envelope: TypedEnvelope<proto::GetPathMetadata>,
775        cx: AsyncApp,
776    ) -> Result<proto::GetPathMetadataResponse> {
777        let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
778        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
779
780        let metadata = fs.metadata(&expanded).await?;
781        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
782
783        Ok(proto::GetPathMetadataResponse {
784            exists: metadata.is_some(),
785            is_dir,
786            path: expanded.to_string_lossy().into_owned(),
787        })
788    }
789
790    async fn handle_shutdown_remote_server(
791        _this: Entity<Self>,
792        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
793        cx: AsyncApp,
794    ) -> Result<proto::Ack> {
795        cx.spawn(async move |cx| {
796            cx.update(|cx| {
797                // TODO: This is a hack, because in a headless project, shutdown isn't executed
798                // when calling quit, but it should be.
799                cx.shutdown();
800                cx.quit();
801            })
802        })
803        .detach();
804
805        Ok(proto::Ack {})
806    }
807
808    pub async fn handle_ping(
809        _this: Entity<Self>,
810        _envelope: TypedEnvelope<proto::Ping>,
811        _cx: AsyncApp,
812    ) -> Result<proto::Ack> {
813        log::debug!("Received ping from client");
814        Ok(proto::Ack {})
815    }
816
817    async fn handle_get_processes(
818        _this: Entity<Self>,
819        _envelope: TypedEnvelope<proto::GetProcesses>,
820        _cx: AsyncApp,
821    ) -> Result<proto::GetProcessesResponse> {
822        let mut processes = Vec::new();
823        let refresh_kind = RefreshKind::nothing().with_processes(
824            ProcessRefreshKind::nothing()
825                .without_tasks()
826                .with_cmd(UpdateKind::Always),
827        );
828
829        for process in System::new_with_specifics(refresh_kind)
830            .processes()
831            .values()
832        {
833            let name = process.name().to_string_lossy().into_owned();
834            let command = process
835                .cmd()
836                .iter()
837                .map(|s| s.to_string_lossy().into_owned())
838                .collect::<Vec<_>>();
839
840            processes.push(proto::ProcessInfo {
841                pid: process.pid().as_u32(),
842                name,
843                command,
844            });
845        }
846
847        processes.sort_by_key(|p| p.name.clone());
848
849        Ok(proto::GetProcessesResponse { processes })
850    }
851
852    async fn handle_get_directory_environment(
853        this: Entity<Self>,
854        envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
855        mut cx: AsyncApp,
856    ) -> Result<proto::DirectoryEnvironment> {
857        let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
858        let directory = PathBuf::from(envelope.payload.directory);
859        let environment = this
860            .update(&mut cx, |this, cx| {
861                this.environment.update(cx, |environment, cx| {
862                    environment.local_directory_environment(&shell, directory.into(), cx)
863                })
864            })?
865            .await
866            .context("failed to get directory environment")?
867            .into_iter()
868            .collect();
869        Ok(proto::DirectoryEnvironment { environment })
870    }
871}
872
873fn prompt_to_proto(
874    prompt: &project::LanguageServerPromptRequest,
875) -> proto::language_server_prompt_request::Level {
876    match prompt.level {
877        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
878            proto::language_server_prompt_request::Info {},
879        ),
880        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
881            proto::language_server_prompt_request::Warning {},
882        ),
883        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
884            proto::language_server_prompt_request::Critical {},
885        ),
886    }
887}