headless_project.rs

   1use anyhow::{Context as _, Result, anyhow};
   2use client::ProjectId;
   3use collections::HashSet;
   4use language::File;
   5use lsp::LanguageServerId;
   6
   7use extension::ExtensionHostProxy;
   8use extension_host::headless_host::HeadlessExtensionStore;
   9use fs::Fs;
  10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  11use http_client::HttpClient;
  12use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
  13use node_runtime::NodeRuntime;
  14use project::{
  15    LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
  16    ToolchainStore, WorktreeId,
  17    agent_server_store::AgentServerStore,
  18    buffer_store::{BufferStore, BufferStoreEvent},
  19    debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
  20    git_store::GitStore,
  21    image_store::ImageId,
  22    lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
  23    project_settings::SettingsObserver,
  24    search::SearchQuery,
  25    task_store::TaskStore,
  26    trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
  27    worktree_store::WorktreeStore,
  28};
  29use rpc::{
  30    AnyProtoClient, TypedEnvelope,
  31    proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
  32};
  33
  34use settings::initial_server_settings_content;
  35use std::{
  36    num::NonZeroU64,
  37    path::{Path, PathBuf},
  38    sync::{
  39        Arc,
  40        atomic::{AtomicU64, AtomicUsize, Ordering},
  41    },
  42};
  43use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
  44use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
  45use worktree::Worktree;
  46
  47pub struct HeadlessProject {
  48    pub fs: Arc<dyn Fs>,
  49    pub session: AnyProtoClient,
  50    pub worktree_store: Entity<WorktreeStore>,
  51    pub buffer_store: Entity<BufferStore>,
  52    pub lsp_store: Entity<LspStore>,
  53    pub task_store: Entity<TaskStore>,
  54    pub dap_store: Entity<DapStore>,
  55    pub breakpoint_store: Entity<BreakpointStore>,
  56    pub agent_server_store: Entity<AgentServerStore>,
  57    pub settings_observer: Entity<SettingsObserver>,
  58    pub next_entry_id: Arc<AtomicUsize>,
  59    pub languages: Arc<LanguageRegistry>,
  60    pub extensions: Entity<HeadlessExtensionStore>,
  61    pub git_store: Entity<GitStore>,
  62    pub environment: Entity<ProjectEnvironment>,
  63    // Used mostly to keep alive the toolchain store for RPC handlers.
  64    // Local variant is used within LSP store, but that's a separate entity.
  65    pub _toolchain_store: Entity<ToolchainStore>,
  66}
  67
  68pub struct HeadlessAppState {
  69    pub session: AnyProtoClient,
  70    pub fs: Arc<dyn Fs>,
  71    pub http_client: Arc<dyn HttpClient>,
  72    pub node_runtime: NodeRuntime,
  73    pub languages: Arc<LanguageRegistry>,
  74    pub extension_host_proxy: Arc<ExtensionHostProxy>,
  75}
  76
  77impl HeadlessProject {
  78    pub fn init(cx: &mut App) {
  79        settings::init(cx);
  80        log_store::init(true, cx);
  81    }
  82
  83    pub fn new(
  84        HeadlessAppState {
  85            session,
  86            fs,
  87            http_client,
  88            node_runtime,
  89            languages,
  90            extension_host_proxy: proxy,
  91        }: HeadlessAppState,
  92        init_worktree_trust: bool,
  93        cx: &mut Context<Self>,
  94    ) -> Self {
  95        debug_adapter_extension::init(proxy.clone(), cx);
  96        languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
  97
  98        let worktree_store = cx.new(|cx| {
  99            let mut store = WorktreeStore::local(true, fs.clone());
 100            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 101            store
 102        });
 103
 104        if init_worktree_trust {
 105            project::trusted_worktrees::track_worktree_trust(
 106                worktree_store.clone(),
 107                None::<RemoteHostLocation>,
 108                Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))),
 109                None,
 110                cx,
 111            );
 112        }
 113
 114        let environment =
 115            cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
 116        let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
 117        let toolchain_store = cx.new(|cx| {
 118            ToolchainStore::local(
 119                languages.clone(),
 120                worktree_store.clone(),
 121                environment.clone(),
 122                manifest_tree.clone(),
 123                fs.clone(),
 124                cx,
 125            )
 126        });
 127
 128        let buffer_store = cx.new(|cx| {
 129            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
 130            buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 131            buffer_store
 132        });
 133
 134        let breakpoint_store = cx.new(|_| {
 135            let mut breakpoint_store =
 136                BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
 137            breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 138
 139            breakpoint_store
 140        });
 141
 142        let dap_store = cx.new(|cx| {
 143            let mut dap_store = DapStore::new_local(
 144                http_client.clone(),
 145                node_runtime.clone(),
 146                fs.clone(),
 147                environment.clone(),
 148                toolchain_store.read(cx).as_language_toolchain_store(),
 149                worktree_store.clone(),
 150                breakpoint_store.clone(),
 151                true,
 152                cx,
 153            );
 154            dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 155            dap_store
 156        });
 157
 158        let git_store = cx.new(|cx| {
 159            let mut store = GitStore::local(
 160                &worktree_store,
 161                buffer_store.clone(),
 162                environment.clone(),
 163                fs.clone(),
 164                cx,
 165            );
 166            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 167            store
 168        });
 169
 170        let prettier_store = cx.new(|cx| {
 171            PrettierStore::new(
 172                node_runtime.clone(),
 173                fs.clone(),
 174                languages.clone(),
 175                worktree_store.clone(),
 176                cx,
 177            )
 178        });
 179
 180        let task_store = cx.new(|cx| {
 181            let mut task_store = TaskStore::local(
 182                buffer_store.downgrade(),
 183                worktree_store.clone(),
 184                toolchain_store.read(cx).as_language_toolchain_store(),
 185                environment.clone(),
 186                cx,
 187            );
 188            task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 189            task_store
 190        });
 191        let settings_observer = cx.new(|cx| {
 192            let mut observer = SettingsObserver::new_local(
 193                fs.clone(),
 194                worktree_store.clone(),
 195                task_store.clone(),
 196                cx,
 197            );
 198            observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 199            observer
 200        });
 201
 202        let lsp_store = cx.new(|cx| {
 203            let mut lsp_store = LspStore::new_local(
 204                buffer_store.clone(),
 205                worktree_store.clone(),
 206                prettier_store.clone(),
 207                toolchain_store
 208                    .read(cx)
 209                    .as_local_store()
 210                    .expect("Toolchain store to be local")
 211                    .clone(),
 212                environment.clone(),
 213                manifest_tree,
 214                languages.clone(),
 215                http_client.clone(),
 216                fs.clone(),
 217                cx,
 218            );
 219            lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 220            lsp_store
 221        });
 222
 223        let agent_server_store = cx.new(|cx| {
 224            let mut agent_server_store = AgentServerStore::local(
 225                node_runtime.clone(),
 226                fs.clone(),
 227                environment.clone(),
 228                http_client.clone(),
 229                cx,
 230            );
 231            agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 232            agent_server_store
 233        });
 234
 235        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
 236        language_extension::init(
 237            language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
 238            proxy.clone(),
 239            languages.clone(),
 240        );
 241
 242        cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
 243            if let BufferStoreEvent::BufferAdded(buffer) = event {
 244                cx.subscribe(buffer, Self::on_buffer_event).detach();
 245            }
 246        })
 247        .detach();
 248
 249        let extensions = HeadlessExtensionStore::new(
 250            fs.clone(),
 251            http_client.clone(),
 252            paths::remote_extensions_dir().to_path_buf(),
 253            proxy,
 254            node_runtime,
 255            cx,
 256        );
 257
 258        // local_machine -> ssh handlers
 259        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
 260        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
 261        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
 262        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
 263        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
 264        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
 265        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
 266        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
 267        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
 268        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
 269        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
 270
 271        session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
 272        session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
 273        session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
 274        session.add_request_handler(cx.weak_entity(), Self::handle_ping);
 275        session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
 276
 277        session.add_entity_request_handler(Self::handle_add_worktree);
 278        session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
 279
 280        session.add_entity_request_handler(Self::handle_open_buffer_by_path);
 281        session.add_entity_request_handler(Self::handle_open_new_buffer);
 282        session.add_entity_request_handler(Self::handle_find_search_candidates);
 283        session.add_entity_request_handler(Self::handle_open_server_settings);
 284        session.add_entity_request_handler(Self::handle_get_directory_environment);
 285        session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
 286        session.add_entity_request_handler(Self::handle_open_image_by_path);
 287        session.add_entity_request_handler(Self::handle_trust_worktrees);
 288        session.add_entity_request_handler(Self::handle_restrict_worktrees);
 289
 290        session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
 291        session.add_entity_request_handler(BufferStore::handle_update_buffer);
 292        session.add_entity_message_handler(BufferStore::handle_close_buffer);
 293
 294        session.add_request_handler(
 295            extensions.downgrade(),
 296            HeadlessExtensionStore::handle_sync_extensions,
 297        );
 298        session.add_request_handler(
 299            extensions.downgrade(),
 300            HeadlessExtensionStore::handle_install_extension,
 301        );
 302
 303        BufferStore::init(&session);
 304        WorktreeStore::init(&session);
 305        SettingsObserver::init(&session);
 306        LspStore::init(&session);
 307        TaskStore::init(Some(&session));
 308        ToolchainStore::init(&session);
 309        DapStore::init(&session, cx);
 310        // todo(debugger): Re init breakpoint store when we set it up for collab
 311        BreakpointStore::init(&session);
 312        GitStore::init(&session);
 313        AgentServerStore::init_headless(&session);
 314
 315        HeadlessProject {
 316            next_entry_id: Default::default(),
 317            session,
 318            settings_observer,
 319            fs,
 320            worktree_store,
 321            buffer_store,
 322            lsp_store,
 323            task_store,
 324            dap_store,
 325            breakpoint_store,
 326            agent_server_store,
 327            languages,
 328            extensions,
 329            git_store,
 330            environment,
 331            _toolchain_store: toolchain_store,
 332        }
 333    }
 334
 335    fn on_buffer_event(
 336        &mut self,
 337        buffer: Entity<Buffer>,
 338        event: &BufferEvent,
 339        cx: &mut Context<Self>,
 340    ) {
 341        if let BufferEvent::Operation {
 342            operation,
 343            is_local: true,
 344        } = event
 345        {
 346            cx.background_spawn(self.session.request(proto::UpdateBuffer {
 347                project_id: REMOTE_SERVER_PROJECT_ID,
 348                buffer_id: buffer.read(cx).remote_id().to_proto(),
 349                operations: vec![serialize_operation(operation)],
 350            }))
 351            .detach()
 352        }
 353    }
 354
 355    fn on_lsp_store_event(
 356        &mut self,
 357        lsp_store: Entity<LspStore>,
 358        event: &LspStoreEvent,
 359        cx: &mut Context<Self>,
 360    ) {
 361        match event {
 362            LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
 363                let log_store = cx
 364                    .try_global::<GlobalLogStore>()
 365                    .map(|lsp_logs| lsp_logs.0.clone());
 366                if let Some(log_store) = log_store {
 367                    log_store.update(cx, |log_store, cx| {
 368                        log_store.add_language_server(
 369                            LanguageServerKind::LocalSsh {
 370                                lsp_store: self.lsp_store.downgrade(),
 371                            },
 372                            *id,
 373                            Some(name.clone()),
 374                            *worktree_id,
 375                            lsp_store.read(cx).language_server_for_id(*id),
 376                            cx,
 377                        );
 378                    });
 379                }
 380            }
 381            LspStoreEvent::LanguageServerRemoved(id) => {
 382                let log_store = cx
 383                    .try_global::<GlobalLogStore>()
 384                    .map(|lsp_logs| lsp_logs.0.clone());
 385                if let Some(log_store) = log_store {
 386                    log_store.update(cx, |log_store, cx| {
 387                        log_store.remove_language_server(*id, cx);
 388                    });
 389                }
 390            }
 391            LspStoreEvent::LanguageServerUpdate {
 392                language_server_id,
 393                name,
 394                message,
 395            } => {
 396                self.session
 397                    .send(proto::UpdateLanguageServer {
 398                        project_id: REMOTE_SERVER_PROJECT_ID,
 399                        server_name: name.as_ref().map(|name| name.to_string()),
 400                        language_server_id: language_server_id.to_proto(),
 401                        variant: Some(message.clone()),
 402                    })
 403                    .log_err();
 404            }
 405            LspStoreEvent::Notification(message) => {
 406                self.session
 407                    .send(proto::Toast {
 408                        project_id: REMOTE_SERVER_PROJECT_ID,
 409                        notification_id: "lsp".to_string(),
 410                        message: message.clone(),
 411                    })
 412                    .log_err();
 413            }
 414            LspStoreEvent::LanguageServerPrompt(prompt) => {
 415                let request = self.session.request(proto::LanguageServerPromptRequest {
 416                    project_id: REMOTE_SERVER_PROJECT_ID,
 417                    actions: prompt
 418                        .actions
 419                        .iter()
 420                        .map(|action| action.title.to_string())
 421                        .collect(),
 422                    level: Some(prompt_to_proto(prompt)),
 423                    lsp_name: prompt.lsp_name.clone(),
 424                    message: prompt.message.clone(),
 425                });
 426                let prompt = prompt.clone();
 427                cx.background_spawn(async move {
 428                    let response = request.await?;
 429                    if let Some(action_response) = response.action_response {
 430                        prompt.respond(action_response as usize).await;
 431                    }
 432                    anyhow::Ok(())
 433                })
 434                .detach();
 435            }
 436            _ => {}
 437        }
 438    }
 439
 440    pub async fn handle_add_worktree(
 441        this: Entity<Self>,
 442        message: TypedEnvelope<proto::AddWorktree>,
 443        mut cx: AsyncApp,
 444    ) -> Result<proto::AddWorktreeResponse> {
 445        use client::ErrorCodeExt;
 446        let fs = this.read_with(&cx, |this, _| this.fs.clone());
 447        let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
 448
 449        let canonicalized = match fs.canonicalize(&path).await {
 450            Ok(path) => path,
 451            Err(e) => {
 452                let mut parent = path
 453                    .parent()
 454                    .ok_or(e)
 455                    .with_context(|| format!("{path:?} does not exist"))?;
 456                if parent == Path::new("") {
 457                    parent = util::paths::home_dir();
 458                }
 459                let parent = fs.canonicalize(parent).await.map_err(|_| {
 460                    anyhow!(
 461                        proto::ErrorCode::DevServerProjectPathDoesNotExist
 462                            .with_tag("path", path.to_string_lossy().as_ref())
 463                    )
 464                })?;
 465                if let Some(file_name) = path.file_name() {
 466                    parent.join(file_name)
 467                } else {
 468                    parent
 469                }
 470            }
 471        };
 472
 473        let worktree = this
 474            .read_with(&cx.clone(), |this, _| {
 475                Worktree::local(
 476                    Arc::from(canonicalized.as_path()),
 477                    message.payload.visible,
 478                    this.fs.clone(),
 479                    this.next_entry_id.clone(),
 480                    true,
 481                    &mut cx,
 482                )
 483            })
 484            .await?;
 485
 486        let response = this.read_with(&cx, |_, cx| {
 487            let worktree = worktree.read(cx);
 488            proto::AddWorktreeResponse {
 489                worktree_id: worktree.id().to_proto(),
 490                canonicalized_path: canonicalized.to_string_lossy().into_owned(),
 491            }
 492        });
 493
 494        // We spawn this asynchronously, so that we can send the response back
 495        // *before* `worktree_store.add()` can send out UpdateProject requests
 496        // to the client about the new worktree.
 497        //
 498        // That lets the client manage the reference/handles of the newly-added
 499        // worktree, before getting interrupted by an UpdateProject request.
 500        //
 501        // This fixes the problem of the client sending the AddWorktree request,
 502        // headless project sending out a project update, client receiving it
 503        // and immediately dropping the reference of the new client, causing it
 504        // to be dropped on the headless project, and the client only then
 505        // receiving a response to AddWorktree.
 506        cx.spawn(async move |cx| {
 507            this.update(cx, |this, cx| {
 508                this.worktree_store.update(cx, |worktree_store, cx| {
 509                    worktree_store.add(&worktree, cx);
 510                });
 511            });
 512        })
 513        .detach();
 514
 515        Ok(response)
 516    }
 517
 518    pub async fn handle_remove_worktree(
 519        this: Entity<Self>,
 520        envelope: TypedEnvelope<proto::RemoveWorktree>,
 521        mut cx: AsyncApp,
 522    ) -> Result<proto::Ack> {
 523        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 524        this.update(&mut cx, |this, cx| {
 525            this.worktree_store.update(cx, |worktree_store, cx| {
 526                worktree_store.remove_worktree(worktree_id, cx);
 527            });
 528        });
 529        Ok(proto::Ack {})
 530    }
 531
 532    pub async fn handle_open_buffer_by_path(
 533        this: Entity<Self>,
 534        message: TypedEnvelope<proto::OpenBufferByPath>,
 535        mut cx: AsyncApp,
 536    ) -> Result<proto::OpenBufferResponse> {
 537        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 538        let path = RelPath::from_proto(&message.payload.path)?;
 539        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 540            let buffer_store = this.buffer_store.clone();
 541            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 542                buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
 543            });
 544            (buffer_store, buffer)
 545        });
 546
 547        let buffer = buffer.await?;
 548        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 549        buffer_store.update(&mut cx, |buffer_store, cx| {
 550            buffer_store
 551                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 552                .detach_and_log_err(cx);
 553        });
 554
 555        Ok(proto::OpenBufferResponse {
 556            buffer_id: buffer_id.to_proto(),
 557        })
 558    }
 559
 560    pub async fn handle_open_image_by_path(
 561        this: Entity<Self>,
 562        message: TypedEnvelope<proto::OpenImageByPath>,
 563        mut cx: AsyncApp,
 564    ) -> Result<proto::OpenImageResponse> {
 565        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 566        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 567        let path = RelPath::from_proto(&message.payload.path)?;
 568        let project_id = message.payload.project_id;
 569        use proto::create_image_for_peer::Variant;
 570
 571        let (worktree_store, session) = this.read_with(&cx, |this, _| {
 572            (this.worktree_store.clone(), this.session.clone())
 573        });
 574
 575        let worktree = worktree_store
 576            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 577            .context("worktree not found")?;
 578
 579        let load_task = worktree.update(&mut cx, |worktree, cx| {
 580            worktree.load_binary_file(path.as_ref(), cx)
 581        });
 582
 583        let loaded_file = load_task.await?;
 584        let content = loaded_file.content;
 585        let file = loaded_file.file;
 586
 587        let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx));
 588        let image_id =
 589            ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
 590
 591        let format = image::guess_format(&content)
 592            .map(|f| format!("{:?}", f).to_lowercase())
 593            .unwrap_or_else(|_| "unknown".to_string());
 594
 595        let state = proto::ImageState {
 596            id: image_id.to_proto(),
 597            file: Some(proto_file),
 598            content_size: content.len() as u64,
 599            format,
 600        };
 601
 602        session.send(proto::CreateImageForPeer {
 603            project_id,
 604            peer_id: Some(REMOTE_SERVER_PEER_ID),
 605            variant: Some(Variant::State(state)),
 606        })?;
 607
 608        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 609        for chunk in content.chunks(CHUNK_SIZE) {
 610            session.send(proto::CreateImageForPeer {
 611                project_id,
 612                peer_id: Some(REMOTE_SERVER_PEER_ID),
 613                variant: Some(Variant::Chunk(proto::ImageChunk {
 614                    image_id: image_id.to_proto(),
 615                    data: chunk.to_vec(),
 616                })),
 617            })?;
 618        }
 619
 620        Ok(proto::OpenImageResponse {
 621            image_id: image_id.to_proto(),
 622        })
 623    }
 624
 625    pub async fn handle_trust_worktrees(
 626        this: Entity<Self>,
 627        envelope: TypedEnvelope<proto::TrustWorktrees>,
 628        mut cx: AsyncApp,
 629    ) -> Result<proto::Ack> {
 630        let trusted_worktrees = cx
 631            .update(|cx| TrustedWorktrees::try_get_global(cx))
 632            .context("missing trusted worktrees")?;
 633        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.clone());
 634        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 635            trusted_worktrees.trust(
 636                &worktree_store,
 637                envelope
 638                    .payload
 639                    .trusted_paths
 640                    .into_iter()
 641                    .filter_map(PathTrust::from_proto)
 642                    .collect(),
 643                cx,
 644            );
 645        });
 646        Ok(proto::Ack {})
 647    }
 648
 649    pub async fn handle_restrict_worktrees(
 650        this: Entity<Self>,
 651        envelope: TypedEnvelope<proto::RestrictWorktrees>,
 652        mut cx: AsyncApp,
 653    ) -> Result<proto::Ack> {
 654        let trusted_worktrees = cx
 655            .update(|cx| TrustedWorktrees::try_get_global(cx))
 656            .context("missing trusted worktrees")?;
 657        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.downgrade());
 658        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 659            let restricted_paths = envelope
 660                .payload
 661                .worktree_ids
 662                .into_iter()
 663                .map(WorktreeId::from_proto)
 664                .map(PathTrust::Worktree)
 665                .collect::<HashSet<_>>();
 666            trusted_worktrees.restrict(worktree_store, restricted_paths, cx);
 667        });
 668        Ok(proto::Ack {})
 669    }
 670
 671    pub async fn handle_open_new_buffer(
 672        this: Entity<Self>,
 673        _message: TypedEnvelope<proto::OpenNewBuffer>,
 674        mut cx: AsyncApp,
 675    ) -> Result<proto::OpenBufferResponse> {
 676        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 677            let buffer_store = this.buffer_store.clone();
 678            let buffer = this
 679                .buffer_store
 680                .update(cx, |buffer_store, cx| buffer_store.create_buffer(true, cx));
 681            (buffer_store, buffer)
 682        });
 683
 684        let buffer = buffer.await?;
 685        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 686        buffer_store.update(&mut cx, |buffer_store, cx| {
 687            buffer_store
 688                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 689                .detach_and_log_err(cx);
 690        });
 691
 692        Ok(proto::OpenBufferResponse {
 693            buffer_id: buffer_id.to_proto(),
 694        })
 695    }
 696
 697    async fn handle_toggle_lsp_logs(
 698        _: Entity<Self>,
 699        envelope: TypedEnvelope<proto::ToggleLspLogs>,
 700        cx: AsyncApp,
 701    ) -> Result<()> {
 702        let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
 703        cx.update(|cx| {
 704            let log_store = cx
 705                .try_global::<GlobalLogStore>()
 706                .map(|global_log_store| global_log_store.0.clone())
 707                .context("lsp logs store is missing")?;
 708            let toggled_log_kind =
 709                match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
 710                    .context("invalid log type")?
 711                {
 712                    proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
 713                    proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
 714                    proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
 715                };
 716            log_store.update(cx, |log_store, _| {
 717                log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
 718            });
 719            anyhow::Ok(())
 720        })?;
 721
 722        Ok(())
 723    }
 724
 725    async fn handle_open_server_settings(
 726        this: Entity<Self>,
 727        _: TypedEnvelope<proto::OpenServerSettings>,
 728        mut cx: AsyncApp,
 729    ) -> Result<proto::OpenBufferResponse> {
 730        let settings_path = paths::settings_file();
 731        let (worktree, path) = this
 732            .update(&mut cx, |this, cx| {
 733                this.worktree_store.update(cx, |worktree_store, cx| {
 734                    worktree_store.find_or_create_worktree(settings_path, false, cx)
 735                })
 736            })
 737            .await?;
 738
 739        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
 740            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 741                buffer_store.open_buffer(
 742                    ProjectPath {
 743                        worktree_id: worktree.read(cx).id(),
 744                        path: path,
 745                    },
 746                    cx,
 747                )
 748            });
 749
 750            (buffer, this.buffer_store.clone())
 751        });
 752
 753        let buffer = buffer.await?;
 754
 755        let buffer_id = cx.update(|cx| {
 756            if buffer.read(cx).is_empty() {
 757                buffer.update(cx, |buffer, cx| {
 758                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
 759                });
 760            }
 761
 762            let buffer_id = buffer.read(cx).remote_id();
 763
 764            buffer_store.update(cx, |buffer_store, cx| {
 765                buffer_store
 766                    .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 767                    .detach_and_log_err(cx);
 768            });
 769
 770            buffer_id
 771        });
 772
 773        Ok(proto::OpenBufferResponse {
 774            buffer_id: buffer_id.to_proto(),
 775        })
 776    }
 777
 778    async fn handle_find_search_candidates(
 779        this: Entity<Self>,
 780        envelope: TypedEnvelope<proto::FindSearchCandidates>,
 781        mut cx: AsyncApp,
 782    ) -> Result<proto::Ack> {
 783        use futures::stream::StreamExt as _;
 784
 785        let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
 786        let message = envelope.payload;
 787        let query = SearchQuery::from_proto(
 788            message.query.context("missing query field")?,
 789            PathStyle::local(),
 790        )?;
 791
 792        let project_id = message.project_id;
 793        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
 794        let handle = message.handle;
 795        let _buffer_store = buffer_store.clone();
 796        let client = this.read_with(&cx, |this, _| this.session.clone());
 797        let task = cx.spawn(async move |cx| {
 798            let results = this.update(cx, |this, cx| {
 799                project::Search::local(
 800                    this.fs.clone(),
 801                    this.buffer_store.clone(),
 802                    this.worktree_store.clone(),
 803                    message.limit as _,
 804                    cx,
 805                )
 806                .into_handle(query, cx)
 807                .matching_buffers(cx)
 808            });
 809            let (batcher, batches) =
 810                project::project_search::AdaptiveBatcher::new(cx.background_executor());
 811            let mut new_matches = Box::pin(results.rx);
 812
 813            let sender_task = cx.background_executor().spawn({
 814                let client = client.clone();
 815                async move {
 816                    let mut batches = std::pin::pin!(batches);
 817                    while let Some(buffer_ids) = batches.next().await {
 818                        client
 819                            .request(proto::FindSearchCandidatesChunk {
 820                                handle,
 821                                peer_id: Some(peer_id),
 822                                project_id,
 823                                variant: Some(
 824                                    proto::find_search_candidates_chunk::Variant::Matches(
 825                                        proto::FindSearchCandidatesMatches { buffer_ids },
 826                                    ),
 827                                ),
 828                            })
 829                            .await?;
 830                    }
 831                    anyhow::Ok(())
 832                }
 833            });
 834
 835            while let Some(buffer) = new_matches.next().await {
 836                let _ = buffer_store
 837                    .update(cx, |this, cx| {
 838                        this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 839                    })
 840                    .await;
 841                let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
 842                batcher.push(buffer_id).await;
 843            }
 844            batcher.flush().await;
 845
 846            sender_task.await?;
 847
 848            client
 849                .request(proto::FindSearchCandidatesChunk {
 850                    handle,
 851                    peer_id: Some(peer_id),
 852                    project_id,
 853                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
 854                        proto::FindSearchCandidatesDone {},
 855                    )),
 856                })
 857                .await?;
 858            anyhow::Ok(())
 859        });
 860        _buffer_store.update(&mut cx, |this, _| {
 861            this.register_ongoing_project_search((peer_id, handle), task);
 862        });
 863
 864        Ok(proto::Ack {})
 865    }
 866
 867    // Goes from client to host.
 868    async fn handle_find_search_candidates_cancel(
 869        this: Entity<Self>,
 870        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
 871        mut cx: AsyncApp,
 872    ) -> Result<()> {
 873        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
 874        BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
 875    }
 876
 877    async fn handle_list_remote_directory(
 878        this: Entity<Self>,
 879        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
 880        cx: AsyncApp,
 881    ) -> Result<proto::ListRemoteDirectoryResponse> {
 882        use smol::stream::StreamExt;
 883        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
 884        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
 885        let check_info = envelope
 886            .payload
 887            .config
 888            .as_ref()
 889            .is_some_and(|config| config.is_dir);
 890
 891        let mut entries = Vec::new();
 892        let mut entry_info = Vec::new();
 893        let mut response = fs.read_dir(&expanded).await?;
 894        while let Some(path) = response.next().await {
 895            let path = path?;
 896            if let Some(file_name) = path.file_name() {
 897                entries.push(file_name.to_string_lossy().into_owned());
 898                if check_info {
 899                    let is_dir = fs.is_dir(&path).await;
 900                    entry_info.push(proto::EntryInfo { is_dir });
 901                }
 902            }
 903        }
 904        Ok(proto::ListRemoteDirectoryResponse {
 905            entries,
 906            entry_info,
 907        })
 908    }
 909
 910    async fn handle_get_path_metadata(
 911        this: Entity<Self>,
 912        envelope: TypedEnvelope<proto::GetPathMetadata>,
 913        cx: AsyncApp,
 914    ) -> Result<proto::GetPathMetadataResponse> {
 915        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
 916        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
 917
 918        let metadata = fs.metadata(&expanded).await?;
 919        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
 920
 921        Ok(proto::GetPathMetadataResponse {
 922            exists: metadata.is_some(),
 923            is_dir,
 924            path: expanded.to_string_lossy().into_owned(),
 925        })
 926    }
 927
 928    async fn handle_shutdown_remote_server(
 929        _this: Entity<Self>,
 930        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
 931        cx: AsyncApp,
 932    ) -> Result<proto::Ack> {
 933        cx.spawn(async move |cx| {
 934            cx.update(|cx| {
 935                // TODO: This is a hack, because in a headless project, shutdown isn't executed
 936                // when calling quit, but it should be.
 937                cx.shutdown();
 938                cx.quit();
 939            })
 940        })
 941        .detach();
 942
 943        Ok(proto::Ack {})
 944    }
 945
 946    pub async fn handle_ping(
 947        _this: Entity<Self>,
 948        _envelope: TypedEnvelope<proto::Ping>,
 949        _cx: AsyncApp,
 950    ) -> Result<proto::Ack> {
 951        log::debug!("Received ping from client");
 952        Ok(proto::Ack {})
 953    }
 954
 955    async fn handle_get_processes(
 956        _this: Entity<Self>,
 957        _envelope: TypedEnvelope<proto::GetProcesses>,
 958        _cx: AsyncApp,
 959    ) -> Result<proto::GetProcessesResponse> {
 960        let mut processes = Vec::new();
 961        let refresh_kind = RefreshKind::nothing().with_processes(
 962            ProcessRefreshKind::nothing()
 963                .without_tasks()
 964                .with_cmd(UpdateKind::Always),
 965        );
 966
 967        for process in System::new_with_specifics(refresh_kind)
 968            .processes()
 969            .values()
 970        {
 971            let name = process.name().to_string_lossy().into_owned();
 972            let command = process
 973                .cmd()
 974                .iter()
 975                .map(|s| s.to_string_lossy().into_owned())
 976                .collect::<Vec<_>>();
 977
 978            processes.push(proto::ProcessInfo {
 979                pid: process.pid().as_u32(),
 980                name,
 981                command,
 982            });
 983        }
 984
 985        processes.sort_by_key(|p| p.name.clone());
 986
 987        Ok(proto::GetProcessesResponse { processes })
 988    }
 989
 990    async fn handle_get_directory_environment(
 991        this: Entity<Self>,
 992        envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
 993        mut cx: AsyncApp,
 994    ) -> Result<proto::DirectoryEnvironment> {
 995        let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
 996        let directory = PathBuf::from(envelope.payload.directory);
 997        let environment = this
 998            .update(&mut cx, |this, cx| {
 999                this.environment.update(cx, |environment, cx| {
1000                    environment.local_directory_environment(&shell, directory.into(), cx)
1001                })
1002            })
1003            .await
1004            .context("failed to get directory environment")?
1005            .into_iter()
1006            .collect();
1007        Ok(proto::DirectoryEnvironment { environment })
1008    }
1009}
1010
1011fn prompt_to_proto(
1012    prompt: &project::LanguageServerPromptRequest,
1013) -> proto::language_server_prompt_request::Level {
1014    match prompt.level {
1015        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
1016            proto::language_server_prompt_request::Info {},
1017        ),
1018        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
1019            proto::language_server_prompt_request::Warning {},
1020        ),
1021        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
1022            proto::language_server_prompt_request::Critical {},
1023        ),
1024    }
1025}