headless_project.rs

  1use anyhow::{Context as _, Result, anyhow};
  2use collections::HashSet;
  3use language::File;
  4use lsp::LanguageServerId;
  5
  6use extension::ExtensionHostProxy;
  7use extension_host::headless_host::HeadlessExtensionStore;
  8use fs::Fs;
  9use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
 10use http_client::HttpClient;
 11use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
 12use node_runtime::NodeRuntime;
 13use project::{
 14    LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
 15    ToolchainStore, WorktreeId,
 16    agent_server_store::AgentServerStore,
 17    buffer_store::{BufferStore, BufferStoreEvent},
 18    debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
 19    git_store::GitStore,
 20    image_store::ImageId,
 21    lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
 22    project_settings::SettingsObserver,
 23    search::SearchQuery,
 24    task_store::TaskStore,
 25    trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
 26    worktree_store::WorktreeStore,
 27};
 28use rpc::{
 29    AnyProtoClient, TypedEnvelope,
 30    proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
 31};
 32
 33use settings::initial_server_settings_content;
 34use smol::stream::StreamExt;
 35use std::{
 36    num::NonZeroU64,
 37    path::{Path, PathBuf},
 38    sync::{
 39        Arc,
 40        atomic::{AtomicU64, AtomicUsize, Ordering},
 41    },
 42};
 43use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
 44use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
 45use worktree::Worktree;
 46
 47pub struct HeadlessProject {
 48    pub fs: Arc<dyn Fs>,
 49    pub session: AnyProtoClient,
 50    pub worktree_store: Entity<WorktreeStore>,
 51    pub buffer_store: Entity<BufferStore>,
 52    pub lsp_store: Entity<LspStore>,
 53    pub task_store: Entity<TaskStore>,
 54    pub dap_store: Entity<DapStore>,
 55    pub breakpoint_store: Entity<BreakpointStore>,
 56    pub agent_server_store: Entity<AgentServerStore>,
 57    pub settings_observer: Entity<SettingsObserver>,
 58    pub next_entry_id: Arc<AtomicUsize>,
 59    pub languages: Arc<LanguageRegistry>,
 60    pub extensions: Entity<HeadlessExtensionStore>,
 61    pub git_store: Entity<GitStore>,
 62    pub environment: Entity<ProjectEnvironment>,
 63    // Used mostly to keep alive the toolchain store for RPC handlers.
 64    // Local variant is used within LSP store, but that's a separate entity.
 65    pub _toolchain_store: Entity<ToolchainStore>,
 66}
 67
 68pub struct HeadlessAppState {
 69    pub session: AnyProtoClient,
 70    pub fs: Arc<dyn Fs>,
 71    pub http_client: Arc<dyn HttpClient>,
 72    pub node_runtime: NodeRuntime,
 73    pub languages: Arc<LanguageRegistry>,
 74    pub extension_host_proxy: Arc<ExtensionHostProxy>,
 75}
 76
 77impl HeadlessProject {
 78    pub fn init(cx: &mut App) {
 79        settings::init(cx);
 80        log_store::init(true, cx);
 81    }
 82
 83    pub fn new(
 84        HeadlessAppState {
 85            session,
 86            fs,
 87            http_client,
 88            node_runtime,
 89            languages,
 90            extension_host_proxy: proxy,
 91        }: HeadlessAppState,
 92        init_worktree_trust: bool,
 93        cx: &mut Context<Self>,
 94    ) -> Self {
 95        debug_adapter_extension::init(proxy.clone(), cx);
 96        languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
 97
 98        let worktree_store = cx.new(|cx| {
 99            let mut store = WorktreeStore::local(true, fs.clone());
100            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
101            store
102        });
103
104        if init_worktree_trust {
105            project::trusted_worktrees::track_worktree_trust(
106                worktree_store.clone(),
107                None::<RemoteHostLocation>,
108                Some((session.clone(), REMOTE_SERVER_PROJECT_ID)),
109                None,
110                cx,
111            );
112        }
113
114        let environment =
115            cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
116        let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
117        let toolchain_store = cx.new(|cx| {
118            ToolchainStore::local(
119                languages.clone(),
120                worktree_store.clone(),
121                environment.clone(),
122                manifest_tree.clone(),
123                fs.clone(),
124                cx,
125            )
126        });
127
128        let buffer_store = cx.new(|cx| {
129            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
130            buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
131            buffer_store
132        });
133
134        let breakpoint_store = cx.new(|_| {
135            let mut breakpoint_store =
136                BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
137            breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
138
139            breakpoint_store
140        });
141
142        let dap_store = cx.new(|cx| {
143            let mut dap_store = DapStore::new_local(
144                http_client.clone(),
145                node_runtime.clone(),
146                fs.clone(),
147                environment.clone(),
148                toolchain_store.read(cx).as_language_toolchain_store(),
149                worktree_store.clone(),
150                breakpoint_store.clone(),
151                true,
152                cx,
153            );
154            dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
155            dap_store
156        });
157
158        let git_store = cx.new(|cx| {
159            let mut store = GitStore::local(
160                &worktree_store,
161                buffer_store.clone(),
162                environment.clone(),
163                fs.clone(),
164                cx,
165            );
166            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
167            store
168        });
169
170        let prettier_store = cx.new(|cx| {
171            PrettierStore::new(
172                node_runtime.clone(),
173                fs.clone(),
174                languages.clone(),
175                worktree_store.clone(),
176                cx,
177            )
178        });
179
180        let task_store = cx.new(|cx| {
181            let mut task_store = TaskStore::local(
182                buffer_store.downgrade(),
183                worktree_store.clone(),
184                toolchain_store.read(cx).as_language_toolchain_store(),
185                environment.clone(),
186                cx,
187            );
188            task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
189            task_store
190        });
191        let settings_observer = cx.new(|cx| {
192            let mut observer = SettingsObserver::new_local(
193                fs.clone(),
194                worktree_store.clone(),
195                task_store.clone(),
196                cx,
197            );
198            observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
199            observer
200        });
201
202        let lsp_store = cx.new(|cx| {
203            let mut lsp_store = LspStore::new_local(
204                buffer_store.clone(),
205                worktree_store.clone(),
206                prettier_store.clone(),
207                toolchain_store
208                    .read(cx)
209                    .as_local_store()
210                    .expect("Toolchain store to be local")
211                    .clone(),
212                environment.clone(),
213                manifest_tree,
214                languages.clone(),
215                http_client.clone(),
216                fs.clone(),
217                cx,
218            );
219            lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
220            lsp_store
221        });
222
223        let agent_server_store = cx.new(|cx| {
224            let mut agent_server_store = AgentServerStore::local(
225                node_runtime.clone(),
226                fs.clone(),
227                environment.clone(),
228                http_client.clone(),
229                cx,
230            );
231            agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
232            agent_server_store
233        });
234
235        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
236        language_extension::init(
237            language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
238            proxy.clone(),
239            languages.clone(),
240        );
241
242        cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
243            if let BufferStoreEvent::BufferAdded(buffer) = event {
244                cx.subscribe(buffer, Self::on_buffer_event).detach();
245            }
246        })
247        .detach();
248
249        let extensions = HeadlessExtensionStore::new(
250            fs.clone(),
251            http_client.clone(),
252            paths::remote_extensions_dir().to_path_buf(),
253            proxy,
254            node_runtime,
255            cx,
256        );
257
258        // local_machine -> ssh handlers
259        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
260        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
261        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
262        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
263        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
264        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
265        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
266        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
267        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
268        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
269        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
270
271        session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
272        session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
273        session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
274        session.add_request_handler(cx.weak_entity(), Self::handle_ping);
275        session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
276
277        session.add_entity_request_handler(Self::handle_add_worktree);
278        session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
279
280        session.add_entity_request_handler(Self::handle_open_buffer_by_path);
281        session.add_entity_request_handler(Self::handle_open_new_buffer);
282        session.add_entity_request_handler(Self::handle_find_search_candidates);
283        session.add_entity_request_handler(Self::handle_open_server_settings);
284        session.add_entity_request_handler(Self::handle_get_directory_environment);
285        session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
286        session.add_entity_request_handler(Self::handle_open_image_by_path);
287        session.add_entity_request_handler(Self::handle_trust_worktrees);
288        session.add_entity_request_handler(Self::handle_restrict_worktrees);
289
290        session.add_entity_request_handler(BufferStore::handle_update_buffer);
291        session.add_entity_message_handler(BufferStore::handle_close_buffer);
292
293        session.add_request_handler(
294            extensions.downgrade(),
295            HeadlessExtensionStore::handle_sync_extensions,
296        );
297        session.add_request_handler(
298            extensions.downgrade(),
299            HeadlessExtensionStore::handle_install_extension,
300        );
301
302        BufferStore::init(&session);
303        WorktreeStore::init(&session);
304        SettingsObserver::init(&session);
305        LspStore::init(&session);
306        TaskStore::init(Some(&session));
307        ToolchainStore::init(&session);
308        DapStore::init(&session, cx);
309        // todo(debugger): Re init breakpoint store when we set it up for collab
310        BreakpointStore::init(&session);
311        GitStore::init(&session);
312        AgentServerStore::init_headless(&session);
313
314        HeadlessProject {
315            next_entry_id: Default::default(),
316            session,
317            settings_observer,
318            fs,
319            worktree_store,
320            buffer_store,
321            lsp_store,
322            task_store,
323            dap_store,
324            breakpoint_store,
325            agent_server_store,
326            languages,
327            extensions,
328            git_store,
329            environment,
330            _toolchain_store: toolchain_store,
331        }
332    }
333
334    fn on_buffer_event(
335        &mut self,
336        buffer: Entity<Buffer>,
337        event: &BufferEvent,
338        cx: &mut Context<Self>,
339    ) {
340        if let BufferEvent::Operation {
341            operation,
342            is_local: true,
343        } = event
344        {
345            cx.background_spawn(self.session.request(proto::UpdateBuffer {
346                project_id: REMOTE_SERVER_PROJECT_ID,
347                buffer_id: buffer.read(cx).remote_id().to_proto(),
348                operations: vec![serialize_operation(operation)],
349            }))
350            .detach()
351        }
352    }
353
354    fn on_lsp_store_event(
355        &mut self,
356        lsp_store: Entity<LspStore>,
357        event: &LspStoreEvent,
358        cx: &mut Context<Self>,
359    ) {
360        match event {
361            LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
362                let log_store = cx
363                    .try_global::<GlobalLogStore>()
364                    .map(|lsp_logs| lsp_logs.0.clone());
365                if let Some(log_store) = log_store {
366                    log_store.update(cx, |log_store, cx| {
367                        log_store.add_language_server(
368                            LanguageServerKind::LocalSsh {
369                                lsp_store: self.lsp_store.downgrade(),
370                            },
371                            *id,
372                            Some(name.clone()),
373                            *worktree_id,
374                            lsp_store.read(cx).language_server_for_id(*id),
375                            cx,
376                        );
377                    });
378                }
379            }
380            LspStoreEvent::LanguageServerRemoved(id) => {
381                let log_store = cx
382                    .try_global::<GlobalLogStore>()
383                    .map(|lsp_logs| lsp_logs.0.clone());
384                if let Some(log_store) = log_store {
385                    log_store.update(cx, |log_store, cx| {
386                        log_store.remove_language_server(*id, cx);
387                    });
388                }
389            }
390            LspStoreEvent::LanguageServerUpdate {
391                language_server_id,
392                name,
393                message,
394            } => {
395                self.session
396                    .send(proto::UpdateLanguageServer {
397                        project_id: REMOTE_SERVER_PROJECT_ID,
398                        server_name: name.as_ref().map(|name| name.to_string()),
399                        language_server_id: language_server_id.to_proto(),
400                        variant: Some(message.clone()),
401                    })
402                    .log_err();
403            }
404            LspStoreEvent::Notification(message) => {
405                self.session
406                    .send(proto::Toast {
407                        project_id: REMOTE_SERVER_PROJECT_ID,
408                        notification_id: "lsp".to_string(),
409                        message: message.clone(),
410                    })
411                    .log_err();
412            }
413            LspStoreEvent::LanguageServerPrompt(prompt) => {
414                let request = self.session.request(proto::LanguageServerPromptRequest {
415                    project_id: REMOTE_SERVER_PROJECT_ID,
416                    actions: prompt
417                        .actions
418                        .iter()
419                        .map(|action| action.title.to_string())
420                        .collect(),
421                    level: Some(prompt_to_proto(prompt)),
422                    lsp_name: prompt.lsp_name.clone(),
423                    message: prompt.message.clone(),
424                });
425                let prompt = prompt.clone();
426                cx.background_spawn(async move {
427                    let response = request.await?;
428                    if let Some(action_response) = response.action_response {
429                        prompt.respond(action_response as usize).await;
430                    }
431                    anyhow::Ok(())
432                })
433                .detach();
434            }
435            _ => {}
436        }
437    }
438
439    pub async fn handle_add_worktree(
440        this: Entity<Self>,
441        message: TypedEnvelope<proto::AddWorktree>,
442        mut cx: AsyncApp,
443    ) -> Result<proto::AddWorktreeResponse> {
444        use client::ErrorCodeExt;
445        let fs = this.read_with(&cx, |this, _| this.fs.clone())?;
446        let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
447
448        let canonicalized = match fs.canonicalize(&path).await {
449            Ok(path) => path,
450            Err(e) => {
451                let mut parent = path
452                    .parent()
453                    .ok_or(e)
454                    .with_context(|| format!("{path:?} does not exist"))?;
455                if parent == Path::new("") {
456                    parent = util::paths::home_dir();
457                }
458                let parent = fs.canonicalize(parent).await.map_err(|_| {
459                    anyhow!(
460                        proto::ErrorCode::DevServerProjectPathDoesNotExist
461                            .with_tag("path", path.to_string_lossy().as_ref())
462                    )
463                })?;
464                parent.join(path.file_name().unwrap())
465            }
466        };
467
468        let worktree = this
469            .read_with(&cx.clone(), |this, _| {
470                Worktree::local(
471                    Arc::from(canonicalized.as_path()),
472                    message.payload.visible,
473                    this.fs.clone(),
474                    this.next_entry_id.clone(),
475                    true,
476                    &mut cx,
477                )
478            })?
479            .await?;
480
481        let response = this.read_with(&cx, |_, cx| {
482            let worktree = worktree.read(cx);
483            proto::AddWorktreeResponse {
484                worktree_id: worktree.id().to_proto(),
485                canonicalized_path: canonicalized.to_string_lossy().into_owned(),
486            }
487        })?;
488
489        // We spawn this asynchronously, so that we can send the response back
490        // *before* `worktree_store.add()` can send out UpdateProject requests
491        // to the client about the new worktree.
492        //
493        // That lets the client manage the reference/handles of the newly-added
494        // worktree, before getting interrupted by an UpdateProject request.
495        //
496        // This fixes the problem of the client sending the AddWorktree request,
497        // headless project sending out a project update, client receiving it
498        // and immediately dropping the reference of the new client, causing it
499        // to be dropped on the headless project, and the client only then
500        // receiving a response to AddWorktree.
501        cx.spawn(async move |cx| {
502            this.update(cx, |this, cx| {
503                this.worktree_store.update(cx, |worktree_store, cx| {
504                    worktree_store.add(&worktree, cx);
505                });
506            })
507            .log_err();
508        })
509        .detach();
510
511        Ok(response)
512    }
513
514    pub async fn handle_remove_worktree(
515        this: Entity<Self>,
516        envelope: TypedEnvelope<proto::RemoveWorktree>,
517        mut cx: AsyncApp,
518    ) -> Result<proto::Ack> {
519        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
520        this.update(&mut cx, |this, cx| {
521            this.worktree_store.update(cx, |worktree_store, cx| {
522                worktree_store.remove_worktree(worktree_id, cx);
523            });
524        })?;
525        Ok(proto::Ack {})
526    }
527
528    pub async fn handle_open_buffer_by_path(
529        this: Entity<Self>,
530        message: TypedEnvelope<proto::OpenBufferByPath>,
531        mut cx: AsyncApp,
532    ) -> Result<proto::OpenBufferResponse> {
533        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
534        let path = RelPath::from_proto(&message.payload.path)?;
535        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
536            let buffer_store = this.buffer_store.clone();
537            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
538                buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
539            });
540            anyhow::Ok((buffer_store, buffer))
541        })??;
542
543        let buffer = buffer.await?;
544        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
545        buffer_store.update(&mut cx, |buffer_store, cx| {
546            buffer_store
547                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
548                .detach_and_log_err(cx);
549        })?;
550
551        Ok(proto::OpenBufferResponse {
552            buffer_id: buffer_id.to_proto(),
553        })
554    }
555
556    pub async fn handle_open_image_by_path(
557        this: Entity<Self>,
558        message: TypedEnvelope<proto::OpenImageByPath>,
559        mut cx: AsyncApp,
560    ) -> Result<proto::OpenImageResponse> {
561        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
562        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
563        let path = RelPath::from_proto(&message.payload.path)?;
564        let project_id = message.payload.project_id;
565        use proto::create_image_for_peer::Variant;
566
567        let (worktree_store, session) = this.read_with(&cx, |this, _| {
568            (this.worktree_store.clone(), this.session.clone())
569        })?;
570
571        let worktree = worktree_store
572            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))?
573            .context("worktree not found")?;
574
575        let load_task = worktree.update(&mut cx, |worktree, cx| {
576            worktree.load_binary_file(path.as_ref(), cx)
577        })?;
578
579        let loaded_file = load_task.await?;
580        let content = loaded_file.content;
581        let file = loaded_file.file;
582
583        let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx))?;
584        let image_id =
585            ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
586
587        let format = image::guess_format(&content)
588            .map(|f| format!("{:?}", f).to_lowercase())
589            .unwrap_or_else(|_| "unknown".to_string());
590
591        let state = proto::ImageState {
592            id: image_id.to_proto(),
593            file: Some(proto_file),
594            content_size: content.len() as u64,
595            format,
596        };
597
598        session.send(proto::CreateImageForPeer {
599            project_id,
600            peer_id: Some(REMOTE_SERVER_PEER_ID),
601            variant: Some(Variant::State(state)),
602        })?;
603
604        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
605        for chunk in content.chunks(CHUNK_SIZE) {
606            session.send(proto::CreateImageForPeer {
607                project_id,
608                peer_id: Some(REMOTE_SERVER_PEER_ID),
609                variant: Some(Variant::Chunk(proto::ImageChunk {
610                    image_id: image_id.to_proto(),
611                    data: chunk.to_vec(),
612                })),
613            })?;
614        }
615
616        Ok(proto::OpenImageResponse {
617            image_id: image_id.to_proto(),
618        })
619    }
620
621    pub async fn handle_trust_worktrees(
622        _: Entity<Self>,
623        envelope: TypedEnvelope<proto::TrustWorktrees>,
624        mut cx: AsyncApp,
625    ) -> Result<proto::Ack> {
626        let trusted_worktrees = cx
627            .update(|cx| TrustedWorktrees::try_get_global(cx))?
628            .context("missing trusted worktrees")?;
629        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
630            trusted_worktrees.trust(
631                envelope
632                    .payload
633                    .trusted_paths
634                    .into_iter()
635                    .filter_map(PathTrust::from_proto)
636                    .collect(),
637                None,
638                cx,
639            );
640        })?;
641        Ok(proto::Ack {})
642    }
643
644    pub async fn handle_restrict_worktrees(
645        _: Entity<Self>,
646        envelope: TypedEnvelope<proto::RestrictWorktrees>,
647        mut cx: AsyncApp,
648    ) -> Result<proto::Ack> {
649        let trusted_worktrees = cx
650            .update(|cx| TrustedWorktrees::try_get_global(cx))?
651            .context("missing trusted worktrees")?;
652        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
653            let restricted_paths = envelope
654                .payload
655                .worktree_ids
656                .into_iter()
657                .map(WorktreeId::from_proto)
658                .map(PathTrust::Worktree)
659                .collect::<HashSet<_>>();
660            trusted_worktrees.restrict(restricted_paths, None, cx);
661        })?;
662        Ok(proto::Ack {})
663    }
664
665    pub async fn handle_open_new_buffer(
666        this: Entity<Self>,
667        _message: TypedEnvelope<proto::OpenNewBuffer>,
668        mut cx: AsyncApp,
669    ) -> Result<proto::OpenBufferResponse> {
670        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
671            let buffer_store = this.buffer_store.clone();
672            let buffer = this
673                .buffer_store
674                .update(cx, |buffer_store, cx| buffer_store.create_buffer(true, cx));
675            anyhow::Ok((buffer_store, buffer))
676        })??;
677
678        let buffer = buffer.await?;
679        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
680        buffer_store.update(&mut cx, |buffer_store, cx| {
681            buffer_store
682                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
683                .detach_and_log_err(cx);
684        })?;
685
686        Ok(proto::OpenBufferResponse {
687            buffer_id: buffer_id.to_proto(),
688        })
689    }
690
691    async fn handle_toggle_lsp_logs(
692        _: Entity<Self>,
693        envelope: TypedEnvelope<proto::ToggleLspLogs>,
694        cx: AsyncApp,
695    ) -> Result<()> {
696        let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
697        cx.update(|cx| {
698            let log_store = cx
699                .try_global::<GlobalLogStore>()
700                .map(|global_log_store| global_log_store.0.clone())
701                .context("lsp logs store is missing")?;
702            let toggled_log_kind =
703                match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
704                    .context("invalid log type")?
705                {
706                    proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
707                    proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
708                    proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
709                };
710            log_store.update(cx, |log_store, _| {
711                log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
712            });
713            anyhow::Ok(())
714        })??;
715
716        Ok(())
717    }
718
719    async fn handle_open_server_settings(
720        this: Entity<Self>,
721        _: TypedEnvelope<proto::OpenServerSettings>,
722        mut cx: AsyncApp,
723    ) -> Result<proto::OpenBufferResponse> {
724        let settings_path = paths::settings_file();
725        let (worktree, path) = this
726            .update(&mut cx, |this, cx| {
727                this.worktree_store.update(cx, |worktree_store, cx| {
728                    worktree_store.find_or_create_worktree(settings_path, false, cx)
729                })
730            })?
731            .await?;
732
733        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
734            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
735                buffer_store.open_buffer(
736                    ProjectPath {
737                        worktree_id: worktree.read(cx).id(),
738                        path: path,
739                    },
740                    cx,
741                )
742            });
743
744            (buffer, this.buffer_store.clone())
745        })?;
746
747        let buffer = buffer.await?;
748
749        let buffer_id = cx.update(|cx| {
750            if buffer.read(cx).is_empty() {
751                buffer.update(cx, |buffer, cx| {
752                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
753                });
754            }
755
756            let buffer_id = buffer.read(cx).remote_id();
757
758            buffer_store.update(cx, |buffer_store, cx| {
759                buffer_store
760                    .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
761                    .detach_and_log_err(cx);
762            });
763
764            buffer_id
765        })?;
766
767        Ok(proto::OpenBufferResponse {
768            buffer_id: buffer_id.to_proto(),
769        })
770    }
771
772    async fn handle_find_search_candidates(
773        this: Entity<Self>,
774        envelope: TypedEnvelope<proto::FindSearchCandidates>,
775        mut cx: AsyncApp,
776    ) -> Result<proto::FindSearchCandidatesResponse> {
777        let message = envelope.payload;
778        let query = SearchQuery::from_proto(
779            message.query.context("missing query field")?,
780            PathStyle::local(),
781        )?;
782        let results = this.update(&mut cx, |this, cx| {
783            project::Search::local(
784                this.fs.clone(),
785                this.buffer_store.clone(),
786                this.worktree_store.clone(),
787                message.limit as _,
788                cx,
789            )
790            .into_handle(query, cx)
791            .matching_buffers(cx)
792        })?;
793
794        let mut response = proto::FindSearchCandidatesResponse {
795            buffer_ids: Vec::new(),
796        };
797
798        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone())?;
799
800        while let Ok(buffer) = results.recv().await {
801            let buffer_id = buffer.read_with(&cx, |this, _| this.remote_id())?;
802            response.buffer_ids.push(buffer_id.to_proto());
803            buffer_store
804                .update(&mut cx, |buffer_store, cx| {
805                    buffer_store.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
806                })?
807                .await?;
808        }
809
810        Ok(response)
811    }
812
813    async fn handle_list_remote_directory(
814        this: Entity<Self>,
815        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
816        cx: AsyncApp,
817    ) -> Result<proto::ListRemoteDirectoryResponse> {
818        let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
819        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
820        let check_info = envelope
821            .payload
822            .config
823            .as_ref()
824            .is_some_and(|config| config.is_dir);
825
826        let mut entries = Vec::new();
827        let mut entry_info = Vec::new();
828        let mut response = fs.read_dir(&expanded).await?;
829        while let Some(path) = response.next().await {
830            let path = path?;
831            if let Some(file_name) = path.file_name() {
832                entries.push(file_name.to_string_lossy().into_owned());
833                if check_info {
834                    let is_dir = fs.is_dir(&path).await;
835                    entry_info.push(proto::EntryInfo { is_dir });
836                }
837            }
838        }
839        Ok(proto::ListRemoteDirectoryResponse {
840            entries,
841            entry_info,
842        })
843    }
844
845    async fn handle_get_path_metadata(
846        this: Entity<Self>,
847        envelope: TypedEnvelope<proto::GetPathMetadata>,
848        cx: AsyncApp,
849    ) -> Result<proto::GetPathMetadataResponse> {
850        let fs = cx.read_entity(&this, |this, _| this.fs.clone())?;
851        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
852
853        let metadata = fs.metadata(&expanded).await?;
854        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
855
856        Ok(proto::GetPathMetadataResponse {
857            exists: metadata.is_some(),
858            is_dir,
859            path: expanded.to_string_lossy().into_owned(),
860        })
861    }
862
863    async fn handle_shutdown_remote_server(
864        _this: Entity<Self>,
865        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
866        cx: AsyncApp,
867    ) -> Result<proto::Ack> {
868        cx.spawn(async move |cx| {
869            cx.update(|cx| {
870                // TODO: This is a hack, because in a headless project, shutdown isn't executed
871                // when calling quit, but it should be.
872                cx.shutdown();
873                cx.quit();
874            })
875        })
876        .detach();
877
878        Ok(proto::Ack {})
879    }
880
881    pub async fn handle_ping(
882        _this: Entity<Self>,
883        _envelope: TypedEnvelope<proto::Ping>,
884        _cx: AsyncApp,
885    ) -> Result<proto::Ack> {
886        log::debug!("Received ping from client");
887        Ok(proto::Ack {})
888    }
889
890    async fn handle_get_processes(
891        _this: Entity<Self>,
892        _envelope: TypedEnvelope<proto::GetProcesses>,
893        _cx: AsyncApp,
894    ) -> Result<proto::GetProcessesResponse> {
895        let mut processes = Vec::new();
896        let refresh_kind = RefreshKind::nothing().with_processes(
897            ProcessRefreshKind::nothing()
898                .without_tasks()
899                .with_cmd(UpdateKind::Always),
900        );
901
902        for process in System::new_with_specifics(refresh_kind)
903            .processes()
904            .values()
905        {
906            let name = process.name().to_string_lossy().into_owned();
907            let command = process
908                .cmd()
909                .iter()
910                .map(|s| s.to_string_lossy().into_owned())
911                .collect::<Vec<_>>();
912
913            processes.push(proto::ProcessInfo {
914                pid: process.pid().as_u32(),
915                name,
916                command,
917            });
918        }
919
920        processes.sort_by_key(|p| p.name.clone());
921
922        Ok(proto::GetProcessesResponse { processes })
923    }
924
925    async fn handle_get_directory_environment(
926        this: Entity<Self>,
927        envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
928        mut cx: AsyncApp,
929    ) -> Result<proto::DirectoryEnvironment> {
930        let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
931        let directory = PathBuf::from(envelope.payload.directory);
932        let environment = this
933            .update(&mut cx, |this, cx| {
934                this.environment.update(cx, |environment, cx| {
935                    environment.local_directory_environment(&shell, directory.into(), cx)
936                })
937            })?
938            .await
939            .context("failed to get directory environment")?
940            .into_iter()
941            .collect();
942        Ok(proto::DirectoryEnvironment { environment })
943    }
944}
945
946fn prompt_to_proto(
947    prompt: &project::LanguageServerPromptRequest,
948) -> proto::language_server_prompt_request::Level {
949    match prompt.level {
950        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
951            proto::language_server_prompt_request::Info {},
952        ),
953        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
954            proto::language_server_prompt_request::Warning {},
955        ),
956        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
957            proto::language_server_prompt_request::Critical {},
958        ),
959    }
960}