headless_project.rs

   1use anyhow::{Context as _, Result, anyhow};
   2use client::ProjectId;
   3use collections::HashMap;
   4use collections::HashSet;
   5use language::File;
   6use lsp::LanguageServerId;
   7
   8use extension::ExtensionHostProxy;
   9use extension_host::headless_host::HeadlessExtensionStore;
  10use fs::Fs;
  11use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  12use http_client::HttpClient;
  13use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
  14use node_runtime::NodeRuntime;
  15use project::{
  16    AgentRegistryStore, LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment,
  17    ProjectPath, ToolchainStore, WorktreeId,
  18    agent_server_store::AgentServerStore,
  19    buffer_store::{BufferStore, BufferStoreEvent},
  20    context_server_store::ContextServerStore,
  21    debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
  22    git_store::GitStore,
  23    image_store::ImageId,
  24    lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
  25    project_settings::SettingsObserver,
  26    search::SearchQuery,
  27    task_store::TaskStore,
  28    trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
  29    worktree_store::{WorktreeIdCounter, WorktreeStore},
  30};
  31use rpc::{
  32    AnyProtoClient, TypedEnvelope,
  33    proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
  34};
  35use smol::process::Child;
  36
  37use settings::initial_server_settings_content;
  38use std::{
  39    num::NonZeroU64,
  40    path::{Path, PathBuf},
  41    sync::{
  42        Arc,
  43        atomic::{AtomicU64, AtomicUsize, Ordering},
  44    },
  45    time::Instant,
  46};
  47use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
  48use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
  49use worktree::Worktree;
  50
  51pub struct HeadlessProject {
  52    pub fs: Arc<dyn Fs>,
  53    pub session: AnyProtoClient,
  54    pub worktree_store: Entity<WorktreeStore>,
  55    pub buffer_store: Entity<BufferStore>,
  56    pub lsp_store: Entity<LspStore>,
  57    pub task_store: Entity<TaskStore>,
  58    pub dap_store: Entity<DapStore>,
  59    pub breakpoint_store: Entity<BreakpointStore>,
  60    pub agent_server_store: Entity<AgentServerStore>,
  61    pub context_server_store: Entity<ContextServerStore>,
  62    pub settings_observer: Entity<SettingsObserver>,
  63    pub next_entry_id: Arc<AtomicUsize>,
  64    pub languages: Arc<LanguageRegistry>,
  65    pub extensions: Entity<HeadlessExtensionStore>,
  66    pub git_store: Entity<GitStore>,
  67    pub environment: Entity<ProjectEnvironment>,
  68    pub profiling_collector: gpui::ProfilingCollector,
  69    // Used mostly to keep alive the toolchain store for RPC handlers.
  70    // Local variant is used within LSP store, but that's a separate entity.
  71    pub _toolchain_store: Entity<ToolchainStore>,
  72    pub kernels: HashMap<String, Child>,
  73}
  74
  75pub struct HeadlessAppState {
  76    pub session: AnyProtoClient,
  77    pub fs: Arc<dyn Fs>,
  78    pub http_client: Arc<dyn HttpClient>,
  79    pub node_runtime: NodeRuntime,
  80    pub languages: Arc<LanguageRegistry>,
  81    pub extension_host_proxy: Arc<ExtensionHostProxy>,
  82    pub startup_time: Instant,
  83}
  84
  85impl HeadlessProject {
  86    pub fn init(cx: &mut App) {
  87        settings::init(cx);
  88        log_store::init(true, cx);
  89    }
  90
  91    pub fn new(
  92        HeadlessAppState {
  93            session,
  94            fs,
  95            http_client,
  96            node_runtime,
  97            languages,
  98            extension_host_proxy: proxy,
  99            startup_time,
 100        }: HeadlessAppState,
 101        init_worktree_trust: bool,
 102        cx: &mut Context<Self>,
 103    ) -> Self {
 104        debug_adapter_extension::init(proxy.clone(), cx);
 105        languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
 106
 107        let worktree_store = cx.new(|cx| {
 108            let mut store = WorktreeStore::local(true, fs.clone(), WorktreeIdCounter::get(cx));
 109            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 110            store
 111        });
 112
 113        if init_worktree_trust {
 114            project::trusted_worktrees::track_worktree_trust(
 115                worktree_store.clone(),
 116                None::<RemoteHostLocation>,
 117                Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))),
 118                None,
 119                cx,
 120            );
 121        }
 122
 123        let environment =
 124            cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
 125        let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
 126        let toolchain_store = cx.new(|cx| {
 127            ToolchainStore::local(
 128                languages.clone(),
 129                worktree_store.clone(),
 130                environment.clone(),
 131                manifest_tree.clone(),
 132                cx,
 133            )
 134        });
 135
 136        let buffer_store = cx.new(|cx| {
 137            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
 138            buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 139            buffer_store
 140        });
 141
 142        let breakpoint_store = cx.new(|_| {
 143            let mut breakpoint_store =
 144                BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
 145            breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 146
 147            breakpoint_store
 148        });
 149
 150        let dap_store = cx.new(|cx| {
 151            let mut dap_store = DapStore::new_local(
 152                http_client.clone(),
 153                node_runtime.clone(),
 154                fs.clone(),
 155                environment.clone(),
 156                toolchain_store.read(cx).as_language_toolchain_store(),
 157                worktree_store.clone(),
 158                breakpoint_store.clone(),
 159                true,
 160                cx,
 161            );
 162            dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 163            dap_store
 164        });
 165
 166        let git_store = cx.new(|cx| {
 167            let mut store = GitStore::local(
 168                &worktree_store,
 169                buffer_store.clone(),
 170                environment.clone(),
 171                fs.clone(),
 172                cx,
 173            );
 174            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 175            store
 176        });
 177
 178        let prettier_store = cx.new(|cx| {
 179            PrettierStore::new(
 180                node_runtime.clone(),
 181                fs.clone(),
 182                languages.clone(),
 183                worktree_store.clone(),
 184                cx,
 185            )
 186        });
 187
 188        let task_store = cx.new(|cx| {
 189            let mut task_store = TaskStore::local(
 190                buffer_store.downgrade(),
 191                worktree_store.clone(),
 192                toolchain_store.read(cx).as_language_toolchain_store(),
 193                environment.clone(),
 194                cx,
 195            );
 196            task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 197            task_store
 198        });
 199        let settings_observer = cx.new(|cx| {
 200            let mut observer = SettingsObserver::new_local(
 201                fs.clone(),
 202                worktree_store.clone(),
 203                task_store.clone(),
 204                true,
 205                cx,
 206            );
 207            observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 208            observer
 209        });
 210
 211        let lsp_store = cx.new(|cx| {
 212            let mut lsp_store = LspStore::new_local(
 213                buffer_store.clone(),
 214                worktree_store.clone(),
 215                prettier_store.clone(),
 216                toolchain_store
 217                    .read(cx)
 218                    .as_local_store()
 219                    .expect("Toolchain store to be local")
 220                    .clone(),
 221                environment.clone(),
 222                manifest_tree,
 223                languages.clone(),
 224                http_client.clone(),
 225                fs.clone(),
 226                cx,
 227            );
 228            lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 229            lsp_store
 230        });
 231
 232        AgentRegistryStore::init_global(cx, fs.clone(), http_client.clone());
 233
 234        let agent_server_store = cx.new(|cx| {
 235            let mut agent_server_store = AgentServerStore::local(
 236                node_runtime.clone(),
 237                fs.clone(),
 238                environment.clone(),
 239                http_client.clone(),
 240                cx,
 241            );
 242            agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 243            agent_server_store
 244        });
 245
 246        let context_server_store = cx.new(|cx| {
 247            let mut context_server_store =
 248                ContextServerStore::local(worktree_store.clone(), None, true, cx);
 249            context_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 250            context_server_store
 251        });
 252
 253        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
 254        language_extension::init(
 255            language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
 256            proxy.clone(),
 257            languages.clone(),
 258        );
 259
 260        cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
 261            if let BufferStoreEvent::BufferAdded(buffer) = event {
 262                cx.subscribe(buffer, Self::on_buffer_event).detach();
 263            }
 264        })
 265        .detach();
 266
 267        let extensions = HeadlessExtensionStore::new(
 268            fs.clone(),
 269            http_client.clone(),
 270            paths::remote_extensions_dir().to_path_buf(),
 271            proxy,
 272            node_runtime,
 273            cx,
 274        );
 275
 276        // local_machine -> ssh handlers
 277        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
 278        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
 279        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
 280        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
 281        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
 282        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
 283        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
 284        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
 285        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
 286        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
 287        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
 288        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &context_server_store);
 289
 290        session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
 291        session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
 292        session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
 293        session.add_request_handler(cx.weak_entity(), Self::handle_ping);
 294        session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
 295        session.add_request_handler(cx.weak_entity(), Self::handle_get_remote_profiling_data);
 296
 297        session.add_entity_request_handler(Self::handle_add_worktree);
 298        session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
 299
 300        session.add_entity_request_handler(Self::handle_open_buffer_by_path);
 301        session.add_entity_request_handler(Self::handle_open_new_buffer);
 302        session.add_entity_request_handler(Self::handle_find_search_candidates);
 303        session.add_entity_request_handler(Self::handle_open_server_settings);
 304        session.add_entity_request_handler(Self::handle_get_directory_environment);
 305        session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
 306        session.add_entity_request_handler(Self::handle_open_image_by_path);
 307        session.add_entity_request_handler(Self::handle_trust_worktrees);
 308        session.add_entity_request_handler(Self::handle_restrict_worktrees);
 309        session.add_entity_request_handler(Self::handle_download_file_by_path);
 310
 311        session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
 312        session.add_entity_request_handler(BufferStore::handle_update_buffer);
 313        session.add_entity_message_handler(BufferStore::handle_close_buffer);
 314
 315        session.add_request_handler(
 316            extensions.downgrade(),
 317            HeadlessExtensionStore::handle_sync_extensions,
 318        );
 319        session.add_request_handler(
 320            extensions.downgrade(),
 321            HeadlessExtensionStore::handle_install_extension,
 322        );
 323
 324        session.add_request_handler(cx.weak_entity(), Self::handle_spawn_kernel);
 325        session.add_request_handler(cx.weak_entity(), Self::handle_kill_kernel);
 326
 327        BufferStore::init(&session);
 328        WorktreeStore::init(&session);
 329        SettingsObserver::init(&session);
 330        LspStore::init(&session);
 331        TaskStore::init(Some(&session));
 332        ToolchainStore::init(&session);
 333        DapStore::init(&session, cx);
 334        // todo(debugger): Re init breakpoint store when we set it up for collab
 335        BreakpointStore::init(&session);
 336        GitStore::init(&session);
 337        AgentServerStore::init_headless(&session);
 338        ContextServerStore::init_headless(&session);
 339
 340        HeadlessProject {
 341            next_entry_id: Default::default(),
 342            session,
 343            settings_observer,
 344            fs,
 345            worktree_store,
 346            buffer_store,
 347            lsp_store,
 348            task_store,
 349            dap_store,
 350            breakpoint_store,
 351            agent_server_store,
 352            context_server_store,
 353            languages,
 354            extensions,
 355            git_store,
 356            environment,
 357            profiling_collector: gpui::ProfilingCollector::new(startup_time),
 358            _toolchain_store: toolchain_store,
 359            kernels: Default::default(),
 360        }
 361    }
 362
 363    fn on_buffer_event(
 364        &mut self,
 365        buffer: Entity<Buffer>,
 366        event: &BufferEvent,
 367        cx: &mut Context<Self>,
 368    ) {
 369        if let BufferEvent::Operation {
 370            operation,
 371            is_local: true,
 372        } = event
 373        {
 374            cx.background_spawn(self.session.request(proto::UpdateBuffer {
 375                project_id: REMOTE_SERVER_PROJECT_ID,
 376                buffer_id: buffer.read(cx).remote_id().to_proto(),
 377                operations: vec![serialize_operation(operation)],
 378            }))
 379            .detach()
 380        }
 381    }
 382
 383    fn on_lsp_store_event(
 384        &mut self,
 385        lsp_store: Entity<LspStore>,
 386        event: &LspStoreEvent,
 387        cx: &mut Context<Self>,
 388    ) {
 389        match event {
 390            LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
 391                let log_store = cx
 392                    .try_global::<GlobalLogStore>()
 393                    .map(|lsp_logs| lsp_logs.0.clone());
 394                if let Some(log_store) = log_store {
 395                    log_store.update(cx, |log_store, cx| {
 396                        log_store.add_language_server(
 397                            LanguageServerKind::LocalSsh {
 398                                lsp_store: self.lsp_store.downgrade(),
 399                            },
 400                            *id,
 401                            Some(name.clone()),
 402                            *worktree_id,
 403                            lsp_store.read(cx).language_server_for_id(*id),
 404                            cx,
 405                        );
 406                    });
 407                }
 408            }
 409            LspStoreEvent::LanguageServerRemoved(id) => {
 410                let log_store = cx
 411                    .try_global::<GlobalLogStore>()
 412                    .map(|lsp_logs| lsp_logs.0.clone());
 413                if let Some(log_store) = log_store {
 414                    log_store.update(cx, |log_store, cx| {
 415                        log_store.remove_language_server(*id, cx);
 416                    });
 417                }
 418            }
 419            LspStoreEvent::LanguageServerUpdate {
 420                language_server_id,
 421                name,
 422                message,
 423            } => {
 424                self.session
 425                    .send(proto::UpdateLanguageServer {
 426                        project_id: REMOTE_SERVER_PROJECT_ID,
 427                        server_name: name.as_ref().map(|name| name.to_string()),
 428                        language_server_id: language_server_id.to_proto(),
 429                        variant: Some(message.clone()),
 430                    })
 431                    .log_err();
 432            }
 433            LspStoreEvent::Notification(message) => {
 434                self.session
 435                    .send(proto::Toast {
 436                        project_id: REMOTE_SERVER_PROJECT_ID,
 437                        notification_id: "lsp".to_string(),
 438                        message: message.clone(),
 439                    })
 440                    .log_err();
 441            }
 442            LspStoreEvent::LanguageServerPrompt(prompt) => {
 443                let request = self.session.request(proto::LanguageServerPromptRequest {
 444                    project_id: REMOTE_SERVER_PROJECT_ID,
 445                    actions: prompt
 446                        .actions
 447                        .iter()
 448                        .map(|action| action.title.to_string())
 449                        .collect(),
 450                    level: Some(prompt_to_proto(prompt)),
 451                    lsp_name: prompt.lsp_name.clone(),
 452                    message: prompt.message.clone(),
 453                });
 454                let prompt = prompt.clone();
 455                cx.background_spawn(async move {
 456                    let response = request.await?;
 457                    if let Some(action_response) = response.action_response {
 458                        prompt.respond(action_response as usize).await;
 459                    }
 460                    anyhow::Ok(())
 461                })
 462                .detach();
 463            }
 464            _ => {}
 465        }
 466    }
 467
 468    pub async fn handle_add_worktree(
 469        this: Entity<Self>,
 470        message: TypedEnvelope<proto::AddWorktree>,
 471        mut cx: AsyncApp,
 472    ) -> Result<proto::AddWorktreeResponse> {
 473        use client::ErrorCodeExt;
 474        let fs = this.read_with(&cx, |this, _| this.fs.clone());
 475        let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
 476
 477        let canonicalized = match fs.canonicalize(&path).await {
 478            Ok(path) => path,
 479            Err(e) => {
 480                let mut parent = path
 481                    .parent()
 482                    .ok_or(e)
 483                    .with_context(|| format!("{path:?} does not exist"))?;
 484                if parent == Path::new("") {
 485                    parent = util::paths::home_dir();
 486                }
 487                let parent = fs.canonicalize(parent).await.map_err(|_| {
 488                    anyhow!(
 489                        proto::ErrorCode::DevServerProjectPathDoesNotExist
 490                            .with_tag("path", path.to_string_lossy().as_ref())
 491                    )
 492                })?;
 493                if let Some(file_name) = path.file_name() {
 494                    parent.join(file_name)
 495                } else {
 496                    parent
 497                }
 498            }
 499        };
 500        let next_worktree_id = this
 501            .update(&mut cx, |this, cx| {
 502                this.worktree_store
 503                    .update(cx, |worktree_store, _| worktree_store.next_worktree_id())
 504            })
 505            .await?;
 506        let worktree = this
 507            .read_with(&cx.clone(), |this, _| {
 508                Worktree::local(
 509                    Arc::from(canonicalized.as_path()),
 510                    message.payload.visible,
 511                    this.fs.clone(),
 512                    this.next_entry_id.clone(),
 513                    true,
 514                    next_worktree_id,
 515                    &mut cx,
 516                )
 517            })
 518            .await?;
 519
 520        let response = this.read_with(&cx, |_, cx| {
 521            let worktree = worktree.read(cx);
 522            proto::AddWorktreeResponse {
 523                worktree_id: worktree.id().to_proto(),
 524                canonicalized_path: canonicalized.to_string_lossy().into_owned(),
 525            }
 526        });
 527
 528        // We spawn this asynchronously, so that we can send the response back
 529        // *before* `worktree_store.add()` can send out UpdateProject requests
 530        // to the client about the new worktree.
 531        //
 532        // That lets the client manage the reference/handles of the newly-added
 533        // worktree, before getting interrupted by an UpdateProject request.
 534        //
 535        // This fixes the problem of the client sending the AddWorktree request,
 536        // headless project sending out a project update, client receiving it
 537        // and immediately dropping the reference of the new client, causing it
 538        // to be dropped on the headless project, and the client only then
 539        // receiving a response to AddWorktree.
 540        cx.spawn(async move |cx| {
 541            this.update(cx, |this, cx| {
 542                this.worktree_store.update(cx, |worktree_store, cx| {
 543                    worktree_store.add(&worktree, cx);
 544                });
 545            });
 546        })
 547        .detach();
 548
 549        Ok(response)
 550    }
 551
 552    pub async fn handle_remove_worktree(
 553        this: Entity<Self>,
 554        envelope: TypedEnvelope<proto::RemoveWorktree>,
 555        mut cx: AsyncApp,
 556    ) -> Result<proto::Ack> {
 557        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 558        this.update(&mut cx, |this, cx| {
 559            this.worktree_store.update(cx, |worktree_store, cx| {
 560                worktree_store.remove_worktree(worktree_id, cx);
 561            });
 562        });
 563        Ok(proto::Ack {})
 564    }
 565
 566    pub async fn handle_open_buffer_by_path(
 567        this: Entity<Self>,
 568        message: TypedEnvelope<proto::OpenBufferByPath>,
 569        mut cx: AsyncApp,
 570    ) -> Result<proto::OpenBufferResponse> {
 571        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 572        let path = RelPath::from_proto(&message.payload.path)?;
 573        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 574            let buffer_store = this.buffer_store.clone();
 575            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 576                buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
 577            });
 578            (buffer_store, buffer)
 579        });
 580
 581        let buffer = buffer.await?;
 582        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 583        buffer_store.update(&mut cx, |buffer_store, cx| {
 584            buffer_store
 585                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 586                .detach_and_log_err(cx);
 587        });
 588
 589        Ok(proto::OpenBufferResponse {
 590            buffer_id: buffer_id.to_proto(),
 591        })
 592    }
 593
 594    pub async fn handle_open_image_by_path(
 595        this: Entity<Self>,
 596        message: TypedEnvelope<proto::OpenImageByPath>,
 597        mut cx: AsyncApp,
 598    ) -> Result<proto::OpenImageResponse> {
 599        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 600        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 601        let path = RelPath::from_proto(&message.payload.path)?;
 602        let project_id = message.payload.project_id;
 603        use proto::create_image_for_peer::Variant;
 604
 605        let (worktree_store, session) = this.read_with(&cx, |this, _| {
 606            (this.worktree_store.clone(), this.session.clone())
 607        });
 608
 609        let worktree = worktree_store
 610            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 611            .context("worktree not found")?;
 612
 613        let load_task = worktree.update(&mut cx, |worktree, cx| {
 614            worktree.load_binary_file(path.as_ref(), cx)
 615        });
 616
 617        let loaded_file = load_task.await?;
 618        let content = loaded_file.content;
 619        let file = loaded_file.file;
 620
 621        let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx));
 622        let image_id =
 623            ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
 624
 625        let format = image::guess_format(&content)
 626            .map(|f| format!("{:?}", f).to_lowercase())
 627            .unwrap_or_else(|_| "unknown".to_string());
 628
 629        let state = proto::ImageState {
 630            id: image_id.to_proto(),
 631            file: Some(proto_file),
 632            content_size: content.len() as u64,
 633            format,
 634        };
 635
 636        session.send(proto::CreateImageForPeer {
 637            project_id,
 638            peer_id: Some(REMOTE_SERVER_PEER_ID),
 639            variant: Some(Variant::State(state)),
 640        })?;
 641
 642        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 643        for chunk in content.chunks(CHUNK_SIZE) {
 644            session.send(proto::CreateImageForPeer {
 645                project_id,
 646                peer_id: Some(REMOTE_SERVER_PEER_ID),
 647                variant: Some(Variant::Chunk(proto::ImageChunk {
 648                    image_id: image_id.to_proto(),
 649                    data: chunk.to_vec(),
 650                })),
 651            })?;
 652        }
 653
 654        Ok(proto::OpenImageResponse {
 655            image_id: image_id.to_proto(),
 656        })
 657    }
 658
 659    pub async fn handle_trust_worktrees(
 660        this: Entity<Self>,
 661        envelope: TypedEnvelope<proto::TrustWorktrees>,
 662        mut cx: AsyncApp,
 663    ) -> Result<proto::Ack> {
 664        let trusted_worktrees = cx
 665            .update(|cx| TrustedWorktrees::try_get_global(cx))
 666            .context("missing trusted worktrees")?;
 667        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.clone());
 668        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 669            trusted_worktrees.trust(
 670                &worktree_store,
 671                envelope
 672                    .payload
 673                    .trusted_paths
 674                    .into_iter()
 675                    .filter_map(PathTrust::from_proto)
 676                    .collect(),
 677                cx,
 678            );
 679        });
 680        Ok(proto::Ack {})
 681    }
 682
 683    pub async fn handle_restrict_worktrees(
 684        this: Entity<Self>,
 685        envelope: TypedEnvelope<proto::RestrictWorktrees>,
 686        mut cx: AsyncApp,
 687    ) -> Result<proto::Ack> {
 688        let trusted_worktrees = cx
 689            .update(|cx| TrustedWorktrees::try_get_global(cx))
 690            .context("missing trusted worktrees")?;
 691        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.downgrade());
 692        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 693            let restricted_paths = envelope
 694                .payload
 695                .worktree_ids
 696                .into_iter()
 697                .map(WorktreeId::from_proto)
 698                .map(PathTrust::Worktree)
 699                .collect::<HashSet<_>>();
 700            trusted_worktrees.restrict(worktree_store, restricted_paths, cx);
 701        });
 702        Ok(proto::Ack {})
 703    }
 704
 705    pub async fn handle_download_file_by_path(
 706        this: Entity<Self>,
 707        message: TypedEnvelope<proto::DownloadFileByPath>,
 708        mut cx: AsyncApp,
 709    ) -> Result<proto::DownloadFileResponse> {
 710        log::debug!(
 711            "handle_download_file_by_path: received request: {:?}",
 712            message.payload
 713        );
 714
 715        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 716        let path = RelPath::from_proto(&message.payload.path)?;
 717        let project_id = message.payload.project_id;
 718        let file_id = message.payload.file_id;
 719        log::debug!(
 720            "handle_download_file_by_path: worktree_id={:?}, path={:?}, file_id={}",
 721            worktree_id,
 722            path,
 723            file_id
 724        );
 725        use proto::create_file_for_peer::Variant;
 726
 727        let (worktree_store, session): (Entity<WorktreeStore>, AnyProtoClient) = this
 728            .read_with(&cx, |this, _| {
 729                (this.worktree_store.clone(), this.session.clone())
 730            });
 731
 732        let worktree = worktree_store
 733            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 734            .context("worktree not found")?;
 735
 736        let download_task = worktree.update(&mut cx, |worktree: &mut Worktree, cx| {
 737            worktree.load_binary_file(path.as_ref(), cx)
 738        });
 739
 740        let downloaded_file = download_task.await?;
 741        let content = downloaded_file.content;
 742        let file = downloaded_file.file;
 743        log::debug!(
 744            "handle_download_file_by_path: file loaded, content_size={}",
 745            content.len()
 746        );
 747
 748        let proto_file = worktree.read_with(&cx, |_worktree: &Worktree, cx| file.to_proto(cx));
 749        log::debug!(
 750            "handle_download_file_by_path: using client-provided file_id={}",
 751            file_id
 752        );
 753
 754        let state = proto::FileState {
 755            id: file_id,
 756            file: Some(proto_file),
 757            content_size: content.len() as u64,
 758        };
 759
 760        log::debug!("handle_download_file_by_path: sending State message");
 761        session.send(proto::CreateFileForPeer {
 762            project_id,
 763            peer_id: Some(REMOTE_SERVER_PEER_ID),
 764            variant: Some(Variant::State(state)),
 765        })?;
 766
 767        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 768        let num_chunks = content.len().div_ceil(CHUNK_SIZE);
 769        log::debug!(
 770            "handle_download_file_by_path: sending {} chunks",
 771            num_chunks
 772        );
 773        for (i, chunk) in content.chunks(CHUNK_SIZE).enumerate() {
 774            log::trace!(
 775                "handle_download_file_by_path: sending chunk {}/{}, size={}",
 776                i + 1,
 777                num_chunks,
 778                chunk.len()
 779            );
 780            session.send(proto::CreateFileForPeer {
 781                project_id,
 782                peer_id: Some(REMOTE_SERVER_PEER_ID),
 783                variant: Some(Variant::Chunk(proto::FileChunk {
 784                    file_id,
 785                    data: chunk.to_vec(),
 786                })),
 787            })?;
 788        }
 789
 790        log::debug!(
 791            "handle_download_file_by_path: returning file_id={}",
 792            file_id
 793        );
 794        Ok(proto::DownloadFileResponse { file_id })
 795    }
 796
 797    pub async fn handle_open_new_buffer(
 798        this: Entity<Self>,
 799        _message: TypedEnvelope<proto::OpenNewBuffer>,
 800        mut cx: AsyncApp,
 801    ) -> Result<proto::OpenBufferResponse> {
 802        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 803            let buffer_store = this.buffer_store.clone();
 804            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 805                buffer_store.create_buffer(None, true, cx)
 806            });
 807            (buffer_store, buffer)
 808        });
 809
 810        let buffer = buffer.await?;
 811        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 812        buffer_store.update(&mut cx, |buffer_store, cx| {
 813            buffer_store
 814                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 815                .detach_and_log_err(cx);
 816        });
 817
 818        Ok(proto::OpenBufferResponse {
 819            buffer_id: buffer_id.to_proto(),
 820        })
 821    }
 822
 823    async fn handle_toggle_lsp_logs(
 824        _: Entity<Self>,
 825        envelope: TypedEnvelope<proto::ToggleLspLogs>,
 826        cx: AsyncApp,
 827    ) -> Result<()> {
 828        let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
 829        cx.update(|cx| {
 830            let log_store = cx
 831                .try_global::<GlobalLogStore>()
 832                .map(|global_log_store| global_log_store.0.clone())
 833                .context("lsp logs store is missing")?;
 834            let toggled_log_kind =
 835                match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
 836                    .context("invalid log type")?
 837                {
 838                    proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
 839                    proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
 840                    proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
 841                };
 842            log_store.update(cx, |log_store, _| {
 843                log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
 844            });
 845            anyhow::Ok(())
 846        })?;
 847
 848        Ok(())
 849    }
 850
 851    async fn handle_open_server_settings(
 852        this: Entity<Self>,
 853        _: TypedEnvelope<proto::OpenServerSettings>,
 854        mut cx: AsyncApp,
 855    ) -> Result<proto::OpenBufferResponse> {
 856        let settings_path = paths::settings_file();
 857        let (worktree, path) = this
 858            .update(&mut cx, |this, cx| {
 859                this.worktree_store.update(cx, |worktree_store, cx| {
 860                    worktree_store.find_or_create_worktree(settings_path, false, cx)
 861                })
 862            })
 863            .await?;
 864
 865        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
 866            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 867                buffer_store.open_buffer(
 868                    ProjectPath {
 869                        worktree_id: worktree.read(cx).id(),
 870                        path,
 871                    },
 872                    cx,
 873                )
 874            });
 875
 876            (buffer, this.buffer_store.clone())
 877        });
 878
 879        let buffer = buffer.await?;
 880
 881        let buffer_id = cx.update(|cx| {
 882            if buffer.read(cx).is_empty() {
 883                buffer.update(cx, |buffer, cx| {
 884                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
 885                });
 886            }
 887
 888            let buffer_id = buffer.read(cx).remote_id();
 889
 890            buffer_store.update(cx, |buffer_store, cx| {
 891                buffer_store
 892                    .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 893                    .detach_and_log_err(cx);
 894            });
 895
 896            buffer_id
 897        });
 898
 899        Ok(proto::OpenBufferResponse {
 900            buffer_id: buffer_id.to_proto(),
 901        })
 902    }
 903
 904    async fn handle_spawn_kernel(
 905        this: Entity<Self>,
 906        envelope: TypedEnvelope<proto::SpawnKernel>,
 907        cx: AsyncApp,
 908    ) -> Result<proto::SpawnKernelResponse> {
 909        let fs = this.update(&mut cx.clone(), |this, _| this.fs.clone());
 910
 911        let mut ports = Vec::new();
 912        for _ in 0..5 {
 913            let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
 914            let port = listener.local_addr()?.port();
 915            ports.push(port);
 916        }
 917
 918        let connection_info = serde_json::json!({
 919            "shell_port": ports[0],
 920            "iopub_port": ports[1],
 921            "stdin_port": ports[2],
 922            "control_port": ports[3],
 923            "hb_port": ports[4],
 924            "ip": "127.0.0.1",
 925            "key": uuid::Uuid::new_v4().to_string(),
 926            "transport": "tcp",
 927            "signature_scheme": "hmac-sha256",
 928            "kernel_name": envelope.payload.kernel_name,
 929        });
 930
 931        let connection_file_content = serde_json::to_string_pretty(&connection_info)?;
 932        let kernel_id = uuid::Uuid::new_v4().to_string();
 933
 934        let connection_file_path = std::env::temp_dir().join(format!("kernel-{}.json", kernel_id));
 935        fs.save(
 936            &connection_file_path,
 937            &connection_file_content.as_str().into(),
 938            language::LineEnding::Unix,
 939        )
 940        .await?;
 941
 942        let working_directory = if envelope.payload.working_directory.is_empty() {
 943            std::env::current_dir()
 944                .ok()
 945                .map(|p| p.to_string_lossy().into_owned())
 946        } else {
 947            Some(envelope.payload.working_directory)
 948        };
 949
 950        // Spawn kernel (Assuming python for now, or we'd need to parse kernelspec logic here or pass the command)
 951
 952        // Spawn kernel
 953        let spawn_kernel = |binary: &str, args: &[String]| {
 954            let mut command = smol::process::Command::new(binary);
 955
 956            if !args.is_empty() {
 957                for arg in args {
 958                    if arg == "{connection_file}" {
 959                        command.arg(&connection_file_path);
 960                    } else {
 961                        command.arg(arg);
 962                    }
 963                }
 964            } else {
 965                command
 966                    .arg("-m")
 967                    .arg("ipykernel_launcher")
 968                    .arg("-f")
 969                    .arg(&connection_file_path);
 970            }
 971
 972            // This ensures subprocesses spawned from the kernel use the correct Python environment
 973            let python_bin_dir = std::path::Path::new(binary).parent();
 974            if let Some(bin_dir) = python_bin_dir {
 975                if let Some(path_var) = std::env::var_os("PATH") {
 976                    let mut paths = std::env::split_paths(&path_var).collect::<Vec<_>>();
 977                    paths.insert(0, bin_dir.to_path_buf());
 978                    if let Ok(new_path) = std::env::join_paths(paths) {
 979                        command.env("PATH", new_path);
 980                    }
 981                }
 982
 983                if let Some(venv_root) = bin_dir.parent() {
 984                    command.env("VIRTUAL_ENV", venv_root.to_string_lossy().to_string());
 985                }
 986            }
 987
 988            if let Some(wd) = &working_directory {
 989                command.current_dir(wd);
 990            }
 991            command.spawn()
 992        };
 993
 994        // We need to manage the child process lifecycle
 995        let child = if !envelope.payload.command.is_empty() {
 996            spawn_kernel(&envelope.payload.command, &envelope.payload.args).context(format!(
 997                "failed to spawn kernel process (command: {})",
 998                envelope.payload.command
 999            ))?
1000        } else {
1001            spawn_kernel("python3", &[])
1002                .or_else(|_| spawn_kernel("python", &[]))
1003                .context("failed to spawn kernel process (tried python3 and python)")?
1004        };
1005
1006        this.update(&mut cx.clone(), |this, _cx| {
1007            this.kernels.insert(kernel_id.clone(), child);
1008        });
1009
1010        Ok(proto::SpawnKernelResponse {
1011            kernel_id,
1012            connection_file: connection_file_content,
1013        })
1014    }
1015
1016    async fn handle_kill_kernel(
1017        this: Entity<Self>,
1018        envelope: TypedEnvelope<proto::KillKernel>,
1019        mut cx: AsyncApp,
1020    ) -> Result<proto::Ack> {
1021        let kernel_id = envelope.payload.kernel_id;
1022        let child = this.update(&mut cx, |this, _| this.kernels.remove(&kernel_id));
1023        if let Some(mut child) = child {
1024            child.kill().log_err();
1025        }
1026        Ok(proto::Ack {})
1027    }
1028
1029    async fn handle_find_search_candidates(
1030        this: Entity<Self>,
1031        envelope: TypedEnvelope<proto::FindSearchCandidates>,
1032        mut cx: AsyncApp,
1033    ) -> Result<proto::Ack> {
1034        use futures::stream::StreamExt as _;
1035
1036        let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
1037        let message = envelope.payload;
1038        let query = SearchQuery::from_proto(
1039            message.query.context("missing query field")?,
1040            PathStyle::local(),
1041        )?;
1042
1043        let project_id = message.project_id;
1044        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
1045        let handle = message.handle;
1046        let _buffer_store = buffer_store.clone();
1047        let client = this.read_with(&cx, |this, _| this.session.clone());
1048        let task = cx.spawn(async move |cx| {
1049            let results = this.update(cx, |this, cx| {
1050                project::Search::local(
1051                    this.fs.clone(),
1052                    this.buffer_store.clone(),
1053                    this.worktree_store.clone(),
1054                    message.limit as _,
1055                    cx,
1056                )
1057                .into_handle(query, cx)
1058                .matching_buffers(cx)
1059            });
1060            let (batcher, batches) =
1061                project::project_search::AdaptiveBatcher::new(cx.background_executor());
1062            let mut new_matches = Box::pin(results.rx);
1063
1064            let sender_task = cx.background_executor().spawn({
1065                let client = client.clone();
1066                async move {
1067                    let mut batches = std::pin::pin!(batches);
1068                    while let Some(buffer_ids) = batches.next().await {
1069                        client
1070                            .request(proto::FindSearchCandidatesChunk {
1071                                handle,
1072                                peer_id: Some(peer_id),
1073                                project_id,
1074                                variant: Some(
1075                                    proto::find_search_candidates_chunk::Variant::Matches(
1076                                        proto::FindSearchCandidatesMatches { buffer_ids },
1077                                    ),
1078                                ),
1079                            })
1080                            .await?;
1081                    }
1082                    anyhow::Ok(())
1083                }
1084            });
1085
1086            while let Some(buffer) = new_matches.next().await {
1087                let _ = buffer_store
1088                    .update(cx, |this, cx| {
1089                        this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
1090                    })
1091                    .await;
1092                let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
1093                batcher.push(buffer_id).await;
1094            }
1095            batcher.flush().await;
1096
1097            sender_task.await?;
1098
1099            client
1100                .request(proto::FindSearchCandidatesChunk {
1101                    handle,
1102                    peer_id: Some(peer_id),
1103                    project_id,
1104                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
1105                        proto::FindSearchCandidatesDone {},
1106                    )),
1107                })
1108                .await?;
1109            anyhow::Ok(())
1110        });
1111        _buffer_store.update(&mut cx, |this, _| {
1112            this.register_ongoing_project_search((peer_id, handle), task);
1113        });
1114
1115        Ok(proto::Ack {})
1116    }
1117
1118    // Goes from client to host.
1119    async fn handle_find_search_candidates_cancel(
1120        this: Entity<Self>,
1121        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
1122        mut cx: AsyncApp,
1123    ) -> Result<()> {
1124        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
1125        BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
1126    }
1127
1128    async fn handle_list_remote_directory(
1129        this: Entity<Self>,
1130        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
1131        cx: AsyncApp,
1132    ) -> Result<proto::ListRemoteDirectoryResponse> {
1133        use smol::stream::StreamExt;
1134        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
1135        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
1136        let check_info = envelope
1137            .payload
1138            .config
1139            .as_ref()
1140            .is_some_and(|config| config.is_dir);
1141
1142        let mut entries = Vec::new();
1143        let mut entry_info = Vec::new();
1144        let mut response = fs.read_dir(&expanded).await?;
1145        while let Some(path) = response.next().await {
1146            let path = path?;
1147            if let Some(file_name) = path.file_name() {
1148                entries.push(file_name.to_string_lossy().into_owned());
1149                if check_info {
1150                    let is_dir = fs.is_dir(&path).await;
1151                    entry_info.push(proto::EntryInfo { is_dir });
1152                }
1153            }
1154        }
1155        Ok(proto::ListRemoteDirectoryResponse {
1156            entries,
1157            entry_info,
1158        })
1159    }
1160
1161    async fn handle_get_path_metadata(
1162        this: Entity<Self>,
1163        envelope: TypedEnvelope<proto::GetPathMetadata>,
1164        cx: AsyncApp,
1165    ) -> Result<proto::GetPathMetadataResponse> {
1166        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
1167        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
1168
1169        let metadata = fs.metadata(&expanded).await?;
1170        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
1171
1172        Ok(proto::GetPathMetadataResponse {
1173            exists: metadata.is_some(),
1174            is_dir,
1175            path: expanded.to_string_lossy().into_owned(),
1176        })
1177    }
1178
1179    async fn handle_shutdown_remote_server(
1180        _this: Entity<Self>,
1181        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
1182        cx: AsyncApp,
1183    ) -> Result<proto::Ack> {
1184        cx.spawn(async move |cx| {
1185            cx.update(|cx| {
1186                // TODO: This is a hack, because in a headless project, shutdown isn't executed
1187                // when calling quit, but it should be.
1188                cx.shutdown();
1189                cx.quit();
1190            })
1191        })
1192        .detach();
1193
1194        Ok(proto::Ack {})
1195    }
1196
1197    pub async fn handle_ping(
1198        _this: Entity<Self>,
1199        _envelope: TypedEnvelope<proto::Ping>,
1200        _cx: AsyncApp,
1201    ) -> Result<proto::Ack> {
1202        log::debug!("Received ping from client");
1203        Ok(proto::Ack {})
1204    }
1205
1206    async fn handle_get_processes(
1207        _this: Entity<Self>,
1208        _envelope: TypedEnvelope<proto::GetProcesses>,
1209        _cx: AsyncApp,
1210    ) -> Result<proto::GetProcessesResponse> {
1211        let mut processes = Vec::new();
1212        let refresh_kind = RefreshKind::nothing().with_processes(
1213            ProcessRefreshKind::nothing()
1214                .without_tasks()
1215                .with_cmd(UpdateKind::Always),
1216        );
1217
1218        for process in System::new_with_specifics(refresh_kind)
1219            .processes()
1220            .values()
1221        {
1222            let name = process.name().to_string_lossy().into_owned();
1223            let command = process
1224                .cmd()
1225                .iter()
1226                .map(|s| s.to_string_lossy().into_owned())
1227                .collect::<Vec<_>>();
1228
1229            processes.push(proto::ProcessInfo {
1230                pid: process.pid().as_u32(),
1231                name,
1232                command,
1233            });
1234        }
1235
1236        processes.sort_by_key(|p| p.name.clone());
1237
1238        Ok(proto::GetProcessesResponse { processes })
1239    }
1240
1241    async fn handle_get_remote_profiling_data(
1242        this: Entity<Self>,
1243        envelope: TypedEnvelope<proto::GetRemoteProfilingData>,
1244        cx: AsyncApp,
1245    ) -> Result<proto::GetRemoteProfilingDataResponse> {
1246        let foreground_only = envelope.payload.foreground_only;
1247
1248        let (deltas, now_nanos) = cx.update(|cx| {
1249            let dispatcher = cx.foreground_executor().dispatcher();
1250            let timings = if foreground_only {
1251                vec![dispatcher.get_current_thread_timings()]
1252            } else {
1253                dispatcher.get_all_timings()
1254            };
1255            this.update(cx, |this, _cx| {
1256                let deltas = this.profiling_collector.collect_unseen(timings);
1257                let now_nanos = Instant::now()
1258                    .duration_since(this.profiling_collector.startup_time())
1259                    .as_nanos() as u64;
1260                (deltas, now_nanos)
1261            })
1262        });
1263
1264        let threads = deltas
1265            .into_iter()
1266            .map(|delta| proto::RemoteProfilingThread {
1267                thread_name: delta.thread_name,
1268                thread_id: delta.thread_id,
1269                timings: delta
1270                    .new_timings
1271                    .into_iter()
1272                    .map(|t| proto::RemoteProfilingTiming {
1273                        location: Some(proto::RemoteProfilingLocation {
1274                            file: t.location.file.to_string(),
1275                            line: t.location.line,
1276                            column: t.location.column,
1277                        }),
1278                        start_nanos: t.start as u64,
1279                        duration_nanos: t.duration as u64,
1280                    })
1281                    .collect(),
1282            })
1283            .collect();
1284
1285        Ok(proto::GetRemoteProfilingDataResponse { threads, now_nanos })
1286    }
1287
1288    async fn handle_get_directory_environment(
1289        this: Entity<Self>,
1290        envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
1291        mut cx: AsyncApp,
1292    ) -> Result<proto::DirectoryEnvironment> {
1293        let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
1294        let directory = PathBuf::from(envelope.payload.directory);
1295        let environment = this
1296            .update(&mut cx, |this, cx| {
1297                this.environment.update(cx, |environment, cx| {
1298                    environment.local_directory_environment(&shell, directory.into(), cx)
1299                })
1300            })
1301            .await
1302            .context("failed to get directory environment")?
1303            .into_iter()
1304            .collect();
1305        Ok(proto::DirectoryEnvironment { environment })
1306    }
1307}
1308
1309fn prompt_to_proto(
1310    prompt: &project::LanguageServerPromptRequest,
1311) -> proto::language_server_prompt_request::Level {
1312    match prompt.level {
1313        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
1314            proto::language_server_prompt_request::Info {},
1315        ),
1316        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
1317            proto::language_server_prompt_request::Warning {},
1318        ),
1319        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
1320            proto::language_server_prompt_request::Critical {},
1321        ),
1322    }
1323}