headless_project.rs

  1use anyhow::{Context as _, Result, anyhow};
  2use language::File;
  3use lsp::LanguageServerId;
  4
  5use extension::ExtensionHostProxy;
  6use extension_host::headless_host::HeadlessExtensionStore;
  7use fs::Fs;
  8use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  9use http_client::HttpClient;
 10use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
 11use node_runtime::NodeRuntime;
 12use project::{
 13    LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
 14    ToolchainStore, WorktreeId,
 15    agent_server_store::AgentServerStore,
 16    buffer_store::{BufferStore, BufferStoreEvent},
 17    debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
 18    git_store::GitStore,
 19    image_store::ImageId,
 20    lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
 21    project_settings::SettingsObserver,
 22    search::SearchQuery,
 23    task_store::TaskStore,
 24    worktree_store::WorktreeStore,
 25};
 26use rpc::{
 27    AnyProtoClient, TypedEnvelope,
 28    proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
 29};
 30
 31use settings::initial_server_settings_content;
 32use smol::stream::StreamExt;
 33use std::{
 34    num::NonZeroU64,
 35    path::{Path, PathBuf},
 36    sync::{
 37        Arc,
 38        atomic::{AtomicU64, AtomicUsize, Ordering},
 39    },
 40};
 41use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
 42use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
 43use worktree::Worktree;
 44
 45pub struct HeadlessProject {
 46    pub fs: Arc<dyn Fs>,
 47    pub session: AnyProtoClient,
 48    pub worktree_store: Entity<WorktreeStore>,
 49    pub buffer_store: Entity<BufferStore>,
 50    pub lsp_store: Entity<LspStore>,
 51    pub task_store: Entity<TaskStore>,
 52    pub dap_store: Entity<DapStore>,
 53    pub agent_server_store: Entity<AgentServerStore>,
 54    pub settings_observer: Entity<SettingsObserver>,
 55    pub next_entry_id: Arc<AtomicUsize>,
 56    pub languages: Arc<LanguageRegistry>,
 57    pub extensions: Entity<HeadlessExtensionStore>,
 58    pub git_store: Entity<GitStore>,
 59    pub environment: Entity<ProjectEnvironment>,
 60    // Used mostly to keep alive the toolchain store for RPC handlers.
 61    // Local variant is used within LSP store, but that's a separate entity.
 62    pub _toolchain_store: Entity<ToolchainStore>,
 63}
 64
 65pub struct HeadlessAppState {
 66    pub session: AnyProtoClient,
 67    pub fs: Arc<dyn Fs>,
 68    pub http_client: Arc<dyn HttpClient>,
 69    pub node_runtime: NodeRuntime,
 70    pub languages: Arc<LanguageRegistry>,
 71    pub extension_host_proxy: Arc<ExtensionHostProxy>,
 72}
 73
 74impl HeadlessProject {
 75    pub fn init(cx: &mut App) {
 76        settings::init(cx);
 77        log_store::init(true, cx);
 78    }
 79
 80    pub fn new(
 81        HeadlessAppState {
 82            session,
 83            fs,
 84            http_client,
 85            node_runtime,
 86            languages,
 87            extension_host_proxy: proxy,
 88        }: HeadlessAppState,
 89        cx: &mut Context<Self>,
 90    ) -> Self {
 91        debug_adapter_extension::init(proxy.clone(), cx);
 92        languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
 93
 94        let worktree_store = cx.new(|cx| {
 95            let mut store = WorktreeStore::local(true, fs.clone());
 96            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 97            store
 98        });
 99
100        let environment =
101            cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
102        let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
103        let toolchain_store = cx.new(|cx| {
104            ToolchainStore::local(
105                languages.clone(),
106                worktree_store.clone(),
107                environment.clone(),
108                manifest_tree.clone(),
109                fs.clone(),
110                cx,
111            )
112        });
113
114        let buffer_store = cx.new(|cx| {
115            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
116            buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
117            buffer_store
118        });
119
120        let breakpoint_store =
121            cx.new(|_| BreakpointStore::local(worktree_store.clone(), buffer_store.clone()));
122
123        let dap_store = cx.new(|cx| {
124            let mut dap_store = DapStore::new_local(
125                http_client.clone(),
126                node_runtime.clone(),
127                fs.clone(),
128                environment.clone(),
129                toolchain_store.read(cx).as_language_toolchain_store(),
130                worktree_store.clone(),
131                breakpoint_store.clone(),
132                true,
133                cx,
134            );
135            dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
136            dap_store
137        });
138
139        let git_store = cx.new(|cx| {
140            let mut store = GitStore::local(
141                &worktree_store,
142                buffer_store.clone(),
143                environment.clone(),
144                fs.clone(),
145                cx,
146            );
147            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
148            store
149        });
150
151        let prettier_store = cx.new(|cx| {
152            PrettierStore::new(
153                node_runtime.clone(),
154                fs.clone(),
155                languages.clone(),
156                worktree_store.clone(),
157                cx,
158            )
159        });
160
161        let task_store = cx.new(|cx| {
162            let mut task_store = TaskStore::local(
163                buffer_store.downgrade(),
164                worktree_store.clone(),
165                toolchain_store.read(cx).as_language_toolchain_store(),
166                environment.clone(),
167                cx,
168            );
169            task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
170            task_store
171        });
172        let settings_observer = cx.new(|cx| {
173            let mut observer = SettingsObserver::new_local(
174                fs.clone(),
175                worktree_store.clone(),
176                task_store.clone(),
177                cx,
178            );
179            observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
180            observer
181        });
182
183        let lsp_store = cx.new(|cx| {
184            let mut lsp_store = LspStore::new_local(
185                buffer_store.clone(),
186                worktree_store.clone(),
187                prettier_store.clone(),
188                toolchain_store
189                    .read(cx)
190                    .as_local_store()
191                    .expect("Toolchain store to be local")
192                    .clone(),
193                environment.clone(),
194                manifest_tree,
195                languages.clone(),
196                http_client.clone(),
197                fs.clone(),
198                cx,
199            );
200            lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
201            lsp_store
202        });
203
204        let agent_server_store = cx.new(|cx| {
205            let mut agent_server_store = AgentServerStore::local(
206                node_runtime.clone(),
207                fs.clone(),
208                environment.clone(),
209                http_client.clone(),
210                cx,
211            );
212            agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
213            agent_server_store
214        });
215
216        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
217        language_extension::init(
218            language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
219            proxy.clone(),
220            languages.clone(),
221        );
222
223        cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
224            if let BufferStoreEvent::BufferAdded(buffer) = event {
225                cx.subscribe(buffer, Self::on_buffer_event).detach();
226            }
227        })
228        .detach();
229
230        let extensions = HeadlessExtensionStore::new(
231            fs.clone(),
232            http_client.clone(),
233            paths::remote_extensions_dir().to_path_buf(),
234            proxy,
235            node_runtime,
236            cx,
237        );
238
239        // local_machine -> ssh handlers
240        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
241        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
242        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
243        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
244        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
245        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
246        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
247        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
248        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
249        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
250
251        session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
252        session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
253        session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
254        session.add_request_handler(cx.weak_entity(), Self::handle_ping);
255        session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
256
257        session.add_entity_request_handler(Self::handle_add_worktree);
258        session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
259
260        session.add_entity_request_handler(Self::handle_open_buffer_by_path);
261        session.add_entity_request_handler(Self::handle_open_new_buffer);
262        session.add_entity_request_handler(Self::handle_find_search_candidates);
263        session.add_entity_request_handler(Self::handle_open_server_settings);
264        session.add_entity_request_handler(Self::handle_get_directory_environment);
265        session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
266        session.add_entity_request_handler(Self::handle_open_image_by_path);
267
268        session.add_entity_request_handler(BufferStore::handle_update_buffer);
269        session.add_entity_message_handler(BufferStore::handle_close_buffer);
270
271        session.add_request_handler(
272            extensions.downgrade(),
273            HeadlessExtensionStore::handle_sync_extensions,
274        );
275        session.add_request_handler(
276            extensions.downgrade(),
277            HeadlessExtensionStore::handle_install_extension,
278        );
279
280        BufferStore::init(&session);
281        WorktreeStore::init(&session);
282        SettingsObserver::init(&session);
283        LspStore::init(&session);
284        TaskStore::init(Some(&session));
285        ToolchainStore::init(&session);
286        DapStore::init(&session, cx);
287        // todo(debugger): Re init breakpoint store when we set it up for collab
288        // BreakpointStore::init(&client);
289        GitStore::init(&session);
290        AgentServerStore::init_headless(&session);
291
292        HeadlessProject {
293            next_entry_id: Default::default(),
294            session,
295            settings_observer,
296            fs,
297            worktree_store,
298            buffer_store,
299            lsp_store,
300            task_store,
301            dap_store,
302            agent_server_store,
303            languages,
304            extensions,
305            git_store,
306            environment,
307            _toolchain_store: toolchain_store,
308        }
309    }
310
311    fn on_buffer_event(
312        &mut self,
313        buffer: Entity<Buffer>,
314        event: &BufferEvent,
315        cx: &mut Context<Self>,
316    ) {
317        if let BufferEvent::Operation {
318            operation,
319            is_local: true,
320        } = event
321        {
322            cx.background_spawn(self.session.request(proto::UpdateBuffer {
323                project_id: REMOTE_SERVER_PROJECT_ID,
324                buffer_id: buffer.read(cx).remote_id().to_proto(),
325                operations: vec![serialize_operation(operation)],
326            }))
327            .detach()
328        }
329    }
330
331    fn on_lsp_store_event(
332        &mut self,
333        lsp_store: Entity<LspStore>,
334        event: &LspStoreEvent,
335        cx: &mut Context<Self>,
336    ) {
337        match event {
338            LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
339                let log_store = cx
340                    .try_global::<GlobalLogStore>()
341                    .map(|lsp_logs| lsp_logs.0.clone());
342                if let Some(log_store) = log_store {
343                    log_store.update(cx, |log_store, cx| {
344                        log_store.add_language_server(
345                            LanguageServerKind::LocalSsh {
346                                lsp_store: self.lsp_store.downgrade(),
347                            },
348                            *id,
349                            Some(name.clone()),
350                            *worktree_id,
351                            lsp_store.read(cx).language_server_for_id(*id),
352                            cx,
353                        );
354                    });
355                }
356            }
357            LspStoreEvent::LanguageServerRemoved(id) => {
358                let log_store = cx
359                    .try_global::<GlobalLogStore>()
360                    .map(|lsp_logs| lsp_logs.0.clone());
361                if let Some(log_store) = log_store {
362                    log_store.update(cx, |log_store, cx| {
363                        log_store.remove_language_server(*id, cx);
364                    });
365                }
366            }
367            LspStoreEvent::LanguageServerUpdate {
368                language_server_id,
369                name,
370                message,
371            } => {
372                self.session
373                    .send(proto::UpdateLanguageServer {
374                        project_id: REMOTE_SERVER_PROJECT_ID,
375                        server_name: name.as_ref().map(|name| name.to_string()),
376                        language_server_id: language_server_id.to_proto(),
377                        variant: Some(message.clone()),
378                    })
379                    .log_err();
380            }
381            LspStoreEvent::Notification(message) => {
382                self.session
383                    .send(proto::Toast {
384                        project_id: REMOTE_SERVER_PROJECT_ID,
385                        notification_id: "lsp".to_string(),
386                        message: message.clone(),
387                    })
388                    .log_err();
389            }
390            LspStoreEvent::LanguageServerPrompt(prompt) => {
391                let request = self.session.request(proto::LanguageServerPromptRequest {
392                    project_id: REMOTE_SERVER_PROJECT_ID,
393                    actions: prompt
394                        .actions
395                        .iter()
396                        .map(|action| action.title.to_string())
397                        .collect(),
398                    level: Some(prompt_to_proto(prompt)),
399                    lsp_name: prompt.lsp_name.clone(),
400                    message: prompt.message.clone(),
401                });
402                let prompt = prompt.clone();
403                cx.background_spawn(async move {
404                    let response = request.await?;
405                    if let Some(action_response) = response.action_response {
406                        prompt.respond(action_response as usize).await;
407                    }
408                    anyhow::Ok(())
409                })
410                .detach();
411            }
412            _ => {}
413        }
414    }
415
416    pub async fn handle_add_worktree(
417        this: Entity<Self>,
418        message: TypedEnvelope<proto::AddWorktree>,
419        mut cx: AsyncApp,
420    ) -> Result<proto::AddWorktreeResponse> {
421        use client::ErrorCodeExt;
422        let fs = this.read_with(&cx, |this, _| this.fs.clone())?;
423        let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
424
425        let canonicalized = match fs.canonicalize(&path).await {
426            Ok(path) => path,
427            Err(e) => {
428                let mut parent = path
429                    .parent()
430                    .ok_or(e)
431                    .with_context(|| format!("{path:?} does not exist"))?;
432                if parent == Path::new("") {
433                    parent = util::paths::home_dir();
434                }
435                let parent = fs.canonicalize(parent).await.map_err(|_| {
436                    anyhow!(
437                        proto::ErrorCode::DevServerProjectPathDoesNotExist
438                            .with_tag("path", path.to_string_lossy().as_ref())
439                    )
440                })?;
441                parent.join(path.file_name().unwrap())
442            }
443        };
444
445        let worktree = this
446            .read_with(&cx.clone(), |this, _| {
447                Worktree::local(
448                    Arc::from(canonicalized.as_path()),
449                    message.payload.visible,
450                    this.fs.clone(),
451                    this.next_entry_id.clone(),
452                    true,
453                    &mut cx,
454                )
455            })?
456            .await?;
457
458        let response = this.read_with(&cx, |_, cx| {
459            let worktree = worktree.read(cx);
460            proto::AddWorktreeResponse {
461                worktree_id: worktree.id().to_proto(),
462                canonicalized_path: canonicalized.to_string_lossy().into_owned(),
463            }
464        })?;
465
466        // We spawn this asynchronously, so that we can send the response back
467        // *before* `worktree_store.add()` can send out UpdateProject requests
468        // to the client about the new worktree.
469        //
470        // That lets the client manage the reference/handles of the newly-added
471        // worktree, before getting interrupted by an UpdateProject request.
472        //
473        // This fixes the problem of the client sending the AddWorktree request,
474        // headless project sending out a project update, client receiving it
475        // and immediately dropping the reference of the new client, causing it
476        // to be dropped on the headless project, and the client only then
477        // receiving a response to AddWorktree.
478        cx.spawn(async move |cx| {
479            this.update(cx, |this, cx| {
480                this.worktree_store.update(cx, |worktree_store, cx| {
481                    worktree_store.add(&worktree, cx);
482                });
483            })
484            .log_err();
485        })
486        .detach();
487
488        Ok(response)
489    }
490
491    pub async fn handle_remove_worktree(
492        this: Entity<Self>,
493        envelope: TypedEnvelope<proto::RemoveWorktree>,
494        mut cx: AsyncApp,
495    ) -> Result<proto::Ack> {
496        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
497        this.update(&mut cx, |this, cx| {
498            this.worktree_store.update(cx, |worktree_store, cx| {
499                worktree_store.remove_worktree(worktree_id, cx);
500            });
501        })?;
502        Ok(proto::Ack {})
503    }
504
505    pub async fn handle_open_buffer_by_path(
506        this: Entity<Self>,
507        message: TypedEnvelope<proto::OpenBufferByPath>,
508        mut cx: AsyncApp,
509    ) -> Result<proto::OpenBufferResponse> {
510        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
511        let path = RelPath::from_proto(&message.payload.path)?;
512        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
513            let buffer_store = this.buffer_store.clone();
514            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
515                buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
516            });
517            anyhow::Ok((buffer_store, buffer))
518        })??;
519
520        let buffer = buffer.await?;
521        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
522        buffer_store.update(&mut cx, |buffer_store, cx| {
523            buffer_store
524                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
525                .detach_and_log_err(cx);
526        })?;
527
528        Ok(proto::OpenBufferResponse {
529            buffer_id: buffer_id.to_proto(),
530        })
531    }
532
533    pub async fn handle_open_image_by_path(
534        this: Entity<Self>,
535        message: TypedEnvelope<proto::OpenImageByPath>,
536        mut cx: AsyncApp,
537    ) -> Result<proto::OpenImageResponse> {
538        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
539        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
540        let path = RelPath::from_proto(&message.payload.path)?;
541        let project_id = message.payload.project_id;
542        use proto::create_image_for_peer::Variant;
543
544        let (worktree_store, session) = this.read_with(&cx, |this, _| {
545            (this.worktree_store.clone(), this.session.clone())
546        })?;
547
548        let worktree = worktree_store
549            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))?
550            .context("worktree not found")?;
551
552        let load_task = worktree.update(&mut cx, |worktree, cx| {
553            worktree.load_binary_file(path.as_ref(), cx)
554        })?;
555
556        let loaded_file = load_task.await?;
557        let content = loaded_file.content;
558        let file = loaded_file.file;
559
560        let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx))?;
561        let image_id =
562            ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
563
564        let format = image::guess_format(&content)
565            .map(|f| format!("{:?}", f).to_lowercase())
566            .unwrap_or_else(|_| "unknown".to_string());
567
568        let state = proto::ImageState {
569            id: image_id.to_proto(),
570            file: Some(proto_file),
571            content_size: content.len() as u64,
572            format,
573        };
574
575        session.send(proto::CreateImageForPeer {
576            project_id,
577            peer_id: Some(REMOTE_SERVER_PEER_ID),
578            variant: Some(Variant::State(state)),
579        })?;
580
581        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
582        for chunk in content.chunks(CHUNK_SIZE) {
583            session.send(proto::CreateImageForPeer {
584                project_id,
585                peer_id: Some(REMOTE_SERVER_PEER_ID),
586                variant: Some(Variant::Chunk(proto::ImageChunk {
587                    image_id: image_id.to_proto(),
588                    data: chunk.to_vec(),
589                })),
590            })?;
591        }
592
593        Ok(proto::OpenImageResponse {
594            image_id: image_id.to_proto(),
595        })
596    }
597
598    pub async fn handle_open_new_buffer(
599        this: Entity<Self>,
600        _message: TypedEnvelope<proto::OpenNewBuffer>,
601        mut cx: AsyncApp,
602    ) -> Result<proto::OpenBufferResponse> {
603        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
604            let buffer_store = this.buffer_store.clone();
605            let buffer = this
606                .buffer_store
607                .update(cx, |buffer_store, cx| buffer_store.create_buffer(true, cx));
608            anyhow::Ok((buffer_store, buffer))
609        })??;
610
611        let buffer = buffer.await?;
612        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
613        buffer_store.update(&mut cx, |buffer_store, cx| {
614            buffer_store
615                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
616                .detach_and_log_err(cx);
617        })?;
618
619        Ok(proto::OpenBufferResponse {
620            buffer_id: buffer_id.to_proto(),
621        })
622    }
623
624    async fn handle_toggle_lsp_logs(
625        _: Entity<Self>,
626        envelope: TypedEnvelope<proto::ToggleLspLogs>,
627        cx: AsyncApp,
628    ) -> Result<()> {
629        let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
630        cx.update(|cx| {
631            let log_store = cx
632                .try_global::<GlobalLogStore>()
633                .map(|global_log_store| global_log_store.0.clone())
634                .context("lsp logs store is missing")?;
635            let toggled_log_kind =
636                match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
637                    .context("invalid log type")?
638                {
639                    proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
640                    proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
641                    proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
642                };
643            log_store.update(cx, |log_store, _| {
644                log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
645            });
646            anyhow::Ok(())
647        })??;
648
649        Ok(())
650    }
651
652    async fn handle_open_server_settings(
653        this: Entity<Self>,
654        _: TypedEnvelope<proto::OpenServerSettings>,
655        mut cx: AsyncApp,
656    ) -> Result<proto::OpenBufferResponse> {
657        let settings_path = paths::settings_file();
658        let (worktree, path) = this
659            .update(&mut cx, |this, cx| {
660                this.worktree_store.update(cx, |worktree_store, cx| {
661                    worktree_store.find_or_create_worktree(settings_path, false, cx)
662                })
663            })?
664            .await?;
665
666        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
667            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
668                buffer_store.open_buffer(
669                    ProjectPath {
670                        worktree_id: worktree.read(cx).id(),
671                        path: path,
672                    },
673                    cx,
674                )
675            });
676
677            (buffer, this.buffer_store.clone())
678        })?;
679
680        let buffer = buffer.await?;
681
682        let buffer_id = cx.update(|cx| {
683            if buffer.read(cx).is_empty() {
684                buffer.update(cx, |buffer, cx| {
685                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
686                });
687            }
688
689            let buffer_id = buffer.read(cx).remote_id();
690
691            buffer_store.update(cx, |buffer_store, cx| {
692                buffer_store
693                    .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
694                    .detach_and_log_err(cx);
695            });
696
697            buffer_id
698        })?;
699
700        Ok(proto::OpenBufferResponse {
701            buffer_id: buffer_id.to_proto(),
702        })
703    }
704
705    async fn handle_find_search_candidates(
706        this: Entity<Self>,
707        envelope: TypedEnvelope<proto::FindSearchCandidates>,
708        mut cx: AsyncApp,
709    ) -> Result<proto::FindSearchCandidatesResponse> {
710        let message = envelope.payload;
711        let query = SearchQuery::from_proto(
712            message.query.context("missing query field")?,
713            PathStyle::local(),
714        )?;
715        let results = this.update(&mut cx, |this, cx| {
716            project::Search::local(
717                this.fs.clone(),
718                this.buffer_store.clone(),
719                this.worktree_store.clone(),
720                message.limit as _,
721                cx,
722            )
723            .into_handle(query, cx)
724            .matching_buffers(cx)
725        })?;
726
727        let mut response = proto::FindSearchCandidatesResponse {
728            buffer_ids: Vec::new(),
729        };
730
731        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
732
733        while let Ok(buffer) = results.recv().await {
734            let buffer_id = buffer.read_with(&cx, |this, _| this.remote_id())?;
735            response.buffer_ids.push(buffer_id.to_proto());
736            buffer_store
737                .update(&mut cx, |buffer_store, cx| {
738                    buffer_store.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
739                })?
740                .await?;
741        }
742
743        Ok(response)
744    }
745
746    async fn handle_list_remote_directory(
747        this: Entity<Self>,
748        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
749        cx: AsyncApp,
750    ) -> Result<proto::ListRemoteDirectoryResponse> {
751        let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
752        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
753        let check_info = envelope
754            .payload
755            .config
756            .as_ref()
757            .is_some_and(|config| config.is_dir);
758
759        let mut entries = Vec::new();
760        let mut entry_info = Vec::new();
761        let mut response = fs.read_dir(&expanded).await?;
762        while let Some(path) = response.next().await {
763            let path = path?;
764            if let Some(file_name) = path.file_name() {
765                entries.push(file_name.to_string_lossy().into_owned());
766                if check_info {
767                    let is_dir = fs.is_dir(&path).await;
768                    entry_info.push(proto::EntryInfo { is_dir });
769                }
770            }
771        }
772        Ok(proto::ListRemoteDirectoryResponse {
773            entries,
774            entry_info,
775        })
776    }
777
778    async fn handle_get_path_metadata(
779        this: Entity<Self>,
780        envelope: TypedEnvelope<proto::GetPathMetadata>,
781        cx: AsyncApp,
782    ) -> Result<proto::GetPathMetadataResponse> {
783        let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
784        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
785
786        let metadata = fs.metadata(&expanded).await?;
787        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
788
789        Ok(proto::GetPathMetadataResponse {
790            exists: metadata.is_some(),
791            is_dir,
792            path: expanded.to_string_lossy().into_owned(),
793        })
794    }
795
796    async fn handle_shutdown_remote_server(
797        _this: Entity<Self>,
798        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
799        cx: AsyncApp,
800    ) -> Result<proto::Ack> {
801        cx.spawn(async move |cx| {
802            cx.update(|cx| {
803                // TODO: This is a hack, because in a headless project, shutdown isn't executed
804                // when calling quit, but it should be.
805                cx.shutdown();
806                cx.quit();
807            })
808        })
809        .detach();
810
811        Ok(proto::Ack {})
812    }
813
814    pub async fn handle_ping(
815        _this: Entity<Self>,
816        _envelope: TypedEnvelope<proto::Ping>,
817        _cx: AsyncApp,
818    ) -> Result<proto::Ack> {
819        log::debug!("Received ping from client");
820        Ok(proto::Ack {})
821    }
822
823    async fn handle_get_processes(
824        _this: Entity<Self>,
825        _envelope: TypedEnvelope<proto::GetProcesses>,
826        _cx: AsyncApp,
827    ) -> Result<proto::GetProcessesResponse> {
828        let mut processes = Vec::new();
829        let refresh_kind = RefreshKind::nothing().with_processes(
830            ProcessRefreshKind::nothing()
831                .without_tasks()
832                .with_cmd(UpdateKind::Always),
833        );
834
835        for process in System::new_with_specifics(refresh_kind)
836            .processes()
837            .values()
838        {
839            let name = process.name().to_string_lossy().into_owned();
840            let command = process
841                .cmd()
842                .iter()
843                .map(|s| s.to_string_lossy().into_owned())
844                .collect::<Vec<_>>();
845
846            processes.push(proto::ProcessInfo {
847                pid: process.pid().as_u32(),
848                name,
849                command,
850            });
851        }
852
853        processes.sort_by_key(|p| p.name.clone());
854
855        Ok(proto::GetProcessesResponse { processes })
856    }
857
858    async fn handle_get_directory_environment(
859        this: Entity<Self>,
860        envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
861        mut cx: AsyncApp,
862    ) -> Result<proto::DirectoryEnvironment> {
863        let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
864        let directory = PathBuf::from(envelope.payload.directory);
865        let environment = this
866            .update(&mut cx, |this, cx| {
867                this.environment.update(cx, |environment, cx| {
868                    environment.local_directory_environment(&shell, directory.into(), cx)
869                })
870            })?
871            .await
872            .context("failed to get directory environment")?
873            .into_iter()
874            .collect();
875        Ok(proto::DirectoryEnvironment { environment })
876    }
877}
878
879fn prompt_to_proto(
880    prompt: &project::LanguageServerPromptRequest,
881) -> proto::language_server_prompt_request::Level {
882    match prompt.level {
883        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
884            proto::language_server_prompt_request::Info {},
885        ),
886        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
887            proto::language_server_prompt_request::Warning {},
888        ),
889        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
890            proto::language_server_prompt_request::Critical {},
891        ),
892    }
893}