headless_project.rs

   1use anyhow::{Context as _, Result, anyhow};
   2use client::ProjectId;
   3use collections::HashSet;
   4use language::File;
   5use lsp::LanguageServerId;
   6
   7use extension::ExtensionHostProxy;
   8use extension_host::headless_host::HeadlessExtensionStore;
   9use fs::Fs;
  10use gpui::{App, AppContext as _, AsyncApp, Context, Entity, PromptLevel};
  11use http_client::HttpClient;
  12use language::{Buffer, BufferEvent, LanguageRegistry, proto::serialize_operation};
  13use node_runtime::NodeRuntime;
  14use project::{
  15    LspStore, LspStoreEvent, ManifestTree, PrettierStore, ProjectEnvironment, ProjectPath,
  16    ToolchainStore, WorktreeId,
  17    agent_server_store::AgentServerStore,
  18    buffer_store::{BufferStore, BufferStoreEvent},
  19    context_server_store::ContextServerStore,
  20    debugger::{breakpoint_store::BreakpointStore, dap_store::DapStore},
  21    git_store::GitStore,
  22    image_store::ImageId,
  23    lsp_store::log_store::{self, GlobalLogStore, LanguageServerKind, LogKind},
  24    project_settings::SettingsObserver,
  25    search::SearchQuery,
  26    task_store::TaskStore,
  27    trusted_worktrees::{PathTrust, RemoteHostLocation, TrustedWorktrees},
  28    worktree_store::WorktreeStore,
  29};
  30use rpc::{
  31    AnyProtoClient, TypedEnvelope,
  32    proto::{self, REMOTE_SERVER_PEER_ID, REMOTE_SERVER_PROJECT_ID},
  33};
  34
  35use settings::initial_server_settings_content;
  36use std::{
  37    num::NonZeroU64,
  38    path::{Path, PathBuf},
  39    sync::{
  40        Arc,
  41        atomic::{AtomicU64, AtomicUsize, Ordering},
  42    },
  43};
  44use sysinfo::{ProcessRefreshKind, RefreshKind, System, UpdateKind};
  45use util::{ResultExt, paths::PathStyle, rel_path::RelPath};
  46use worktree::Worktree;
  47
  48pub struct HeadlessProject {
  49    pub fs: Arc<dyn Fs>,
  50    pub session: AnyProtoClient,
  51    pub worktree_store: Entity<WorktreeStore>,
  52    pub buffer_store: Entity<BufferStore>,
  53    pub lsp_store: Entity<LspStore>,
  54    pub task_store: Entity<TaskStore>,
  55    pub dap_store: Entity<DapStore>,
  56    pub breakpoint_store: Entity<BreakpointStore>,
  57    pub agent_server_store: Entity<AgentServerStore>,
  58    pub context_server_store: Entity<ContextServerStore>,
  59    pub settings_observer: Entity<SettingsObserver>,
  60    pub next_entry_id: Arc<AtomicUsize>,
  61    pub languages: Arc<LanguageRegistry>,
  62    pub extensions: Entity<HeadlessExtensionStore>,
  63    pub git_store: Entity<GitStore>,
  64    pub environment: Entity<ProjectEnvironment>,
  65    // Used mostly to keep alive the toolchain store for RPC handlers.
  66    // Local variant is used within LSP store, but that's a separate entity.
  67    pub _toolchain_store: Entity<ToolchainStore>,
  68}
  69
  70pub struct HeadlessAppState {
  71    pub session: AnyProtoClient,
  72    pub fs: Arc<dyn Fs>,
  73    pub http_client: Arc<dyn HttpClient>,
  74    pub node_runtime: NodeRuntime,
  75    pub languages: Arc<LanguageRegistry>,
  76    pub extension_host_proxy: Arc<ExtensionHostProxy>,
  77}
  78
  79impl HeadlessProject {
  80    pub fn init(cx: &mut App) {
  81        settings::init(cx);
  82        log_store::init(true, cx);
  83    }
  84
  85    pub fn new(
  86        HeadlessAppState {
  87            session,
  88            fs,
  89            http_client,
  90            node_runtime,
  91            languages,
  92            extension_host_proxy: proxy,
  93        }: HeadlessAppState,
  94        init_worktree_trust: bool,
  95        cx: &mut Context<Self>,
  96    ) -> Self {
  97        debug_adapter_extension::init(proxy.clone(), cx);
  98        languages::init(languages.clone(), fs.clone(), node_runtime.clone(), cx);
  99
 100        let worktree_store = cx.new(|cx| {
 101            let mut store = WorktreeStore::local(true, fs.clone());
 102            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 103            store
 104        });
 105
 106        if init_worktree_trust {
 107            project::trusted_worktrees::track_worktree_trust(
 108                worktree_store.clone(),
 109                None::<RemoteHostLocation>,
 110                Some((session.clone(), ProjectId(REMOTE_SERVER_PROJECT_ID))),
 111                None,
 112                cx,
 113            );
 114        }
 115
 116        let environment =
 117            cx.new(|cx| ProjectEnvironment::new(None, worktree_store.downgrade(), None, true, cx));
 118        let manifest_tree = ManifestTree::new(worktree_store.clone(), cx);
 119        let toolchain_store = cx.new(|cx| {
 120            ToolchainStore::local(
 121                languages.clone(),
 122                worktree_store.clone(),
 123                environment.clone(),
 124                manifest_tree.clone(),
 125                fs.clone(),
 126                cx,
 127            )
 128        });
 129
 130        let buffer_store = cx.new(|cx| {
 131            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
 132            buffer_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 133            buffer_store
 134        });
 135
 136        let breakpoint_store = cx.new(|_| {
 137            let mut breakpoint_store =
 138                BreakpointStore::local(worktree_store.clone(), buffer_store.clone());
 139            breakpoint_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 140
 141            breakpoint_store
 142        });
 143
 144        let dap_store = cx.new(|cx| {
 145            let mut dap_store = DapStore::new_local(
 146                http_client.clone(),
 147                node_runtime.clone(),
 148                fs.clone(),
 149                environment.clone(),
 150                toolchain_store.read(cx).as_language_toolchain_store(),
 151                worktree_store.clone(),
 152                breakpoint_store.clone(),
 153                true,
 154                cx,
 155            );
 156            dap_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 157            dap_store
 158        });
 159
 160        let git_store = cx.new(|cx| {
 161            let mut store = GitStore::local(
 162                &worktree_store,
 163                buffer_store.clone(),
 164                environment.clone(),
 165                fs.clone(),
 166                cx,
 167            );
 168            store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 169            store
 170        });
 171
 172        let prettier_store = cx.new(|cx| {
 173            PrettierStore::new(
 174                node_runtime.clone(),
 175                fs.clone(),
 176                languages.clone(),
 177                worktree_store.clone(),
 178                cx,
 179            )
 180        });
 181
 182        let task_store = cx.new(|cx| {
 183            let mut task_store = TaskStore::local(
 184                buffer_store.downgrade(),
 185                worktree_store.clone(),
 186                toolchain_store.read(cx).as_language_toolchain_store(),
 187                environment.clone(),
 188                cx,
 189            );
 190            task_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 191            task_store
 192        });
 193        let settings_observer = cx.new(|cx| {
 194            let mut observer = SettingsObserver::new_local(
 195                fs.clone(),
 196                worktree_store.clone(),
 197                task_store.clone(),
 198                true,
 199                cx,
 200            );
 201            observer.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 202            observer
 203        });
 204
 205        let lsp_store = cx.new(|cx| {
 206            let mut lsp_store = LspStore::new_local(
 207                buffer_store.clone(),
 208                worktree_store.clone(),
 209                prettier_store.clone(),
 210                toolchain_store
 211                    .read(cx)
 212                    .as_local_store()
 213                    .expect("Toolchain store to be local")
 214                    .clone(),
 215                environment.clone(),
 216                manifest_tree,
 217                languages.clone(),
 218                http_client.clone(),
 219                fs.clone(),
 220                cx,
 221            );
 222            lsp_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 223            lsp_store
 224        });
 225
 226        let agent_server_store = cx.new(|cx| {
 227            let mut agent_server_store = AgentServerStore::local(
 228                node_runtime.clone(),
 229                fs.clone(),
 230                environment.clone(),
 231                http_client.clone(),
 232                cx,
 233            );
 234            agent_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone(), cx);
 235            agent_server_store
 236        });
 237
 238        let context_server_store = cx.new(|cx| {
 239            let mut context_server_store =
 240                ContextServerStore::local(worktree_store.clone(), None, true, cx);
 241            context_server_store.shared(REMOTE_SERVER_PROJECT_ID, session.clone());
 242            context_server_store
 243        });
 244
 245        cx.subscribe(&lsp_store, Self::on_lsp_store_event).detach();
 246        language_extension::init(
 247            language_extension::LspAccess::ViaLspStore(lsp_store.clone()),
 248            proxy.clone(),
 249            languages.clone(),
 250        );
 251
 252        cx.subscribe(&buffer_store, |_this, _buffer_store, event, cx| {
 253            if let BufferStoreEvent::BufferAdded(buffer) = event {
 254                cx.subscribe(buffer, Self::on_buffer_event).detach();
 255            }
 256        })
 257        .detach();
 258
 259        let extensions = HeadlessExtensionStore::new(
 260            fs.clone(),
 261            http_client.clone(),
 262            paths::remote_extensions_dir().to_path_buf(),
 263            proxy,
 264            node_runtime,
 265            cx,
 266        );
 267
 268        // local_machine -> ssh handlers
 269        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &worktree_store);
 270        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &buffer_store);
 271        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &cx.entity());
 272        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &lsp_store);
 273        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &task_store);
 274        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &toolchain_store);
 275        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &dap_store);
 276        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &breakpoint_store);
 277        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &settings_observer);
 278        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &git_store);
 279        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &agent_server_store);
 280        session.subscribe_to_entity(REMOTE_SERVER_PROJECT_ID, &context_server_store);
 281
 282        session.add_request_handler(cx.weak_entity(), Self::handle_list_remote_directory);
 283        session.add_request_handler(cx.weak_entity(), Self::handle_get_path_metadata);
 284        session.add_request_handler(cx.weak_entity(), Self::handle_shutdown_remote_server);
 285        session.add_request_handler(cx.weak_entity(), Self::handle_ping);
 286        session.add_request_handler(cx.weak_entity(), Self::handle_get_processes);
 287
 288        session.add_entity_request_handler(Self::handle_add_worktree);
 289        session.add_request_handler(cx.weak_entity(), Self::handle_remove_worktree);
 290
 291        session.add_entity_request_handler(Self::handle_open_buffer_by_path);
 292        session.add_entity_request_handler(Self::handle_open_new_buffer);
 293        session.add_entity_request_handler(Self::handle_find_search_candidates);
 294        session.add_entity_request_handler(Self::handle_open_server_settings);
 295        session.add_entity_request_handler(Self::handle_get_directory_environment);
 296        session.add_entity_message_handler(Self::handle_toggle_lsp_logs);
 297        session.add_entity_request_handler(Self::handle_open_image_by_path);
 298        session.add_entity_request_handler(Self::handle_trust_worktrees);
 299        session.add_entity_request_handler(Self::handle_restrict_worktrees);
 300
 301        session.add_entity_message_handler(Self::handle_find_search_candidates_cancel);
 302        session.add_entity_request_handler(BufferStore::handle_update_buffer);
 303        session.add_entity_message_handler(BufferStore::handle_close_buffer);
 304
 305        session.add_request_handler(
 306            extensions.downgrade(),
 307            HeadlessExtensionStore::handle_sync_extensions,
 308        );
 309        session.add_request_handler(
 310            extensions.downgrade(),
 311            HeadlessExtensionStore::handle_install_extension,
 312        );
 313
 314        BufferStore::init(&session);
 315        WorktreeStore::init(&session);
 316        SettingsObserver::init(&session);
 317        LspStore::init(&session);
 318        TaskStore::init(Some(&session));
 319        ToolchainStore::init(&session);
 320        DapStore::init(&session, cx);
 321        // todo(debugger): Re init breakpoint store when we set it up for collab
 322        BreakpointStore::init(&session);
 323        GitStore::init(&session);
 324        AgentServerStore::init_headless(&session);
 325        ContextServerStore::init_headless(&session);
 326
 327        HeadlessProject {
 328            next_entry_id: Default::default(),
 329            session,
 330            settings_observer,
 331            fs,
 332            worktree_store,
 333            buffer_store,
 334            lsp_store,
 335            task_store,
 336            dap_store,
 337            breakpoint_store,
 338            agent_server_store,
 339            context_server_store,
 340            languages,
 341            extensions,
 342            git_store,
 343            environment,
 344            _toolchain_store: toolchain_store,
 345        }
 346    }
 347
 348    fn on_buffer_event(
 349        &mut self,
 350        buffer: Entity<Buffer>,
 351        event: &BufferEvent,
 352        cx: &mut Context<Self>,
 353    ) {
 354        if let BufferEvent::Operation {
 355            operation,
 356            is_local: true,
 357        } = event
 358        {
 359            cx.background_spawn(self.session.request(proto::UpdateBuffer {
 360                project_id: REMOTE_SERVER_PROJECT_ID,
 361                buffer_id: buffer.read(cx).remote_id().to_proto(),
 362                operations: vec![serialize_operation(operation)],
 363            }))
 364            .detach()
 365        }
 366    }
 367
 368    fn on_lsp_store_event(
 369        &mut self,
 370        lsp_store: Entity<LspStore>,
 371        event: &LspStoreEvent,
 372        cx: &mut Context<Self>,
 373    ) {
 374        match event {
 375            LspStoreEvent::LanguageServerAdded(id, name, worktree_id) => {
 376                let log_store = cx
 377                    .try_global::<GlobalLogStore>()
 378                    .map(|lsp_logs| lsp_logs.0.clone());
 379                if let Some(log_store) = log_store {
 380                    log_store.update(cx, |log_store, cx| {
 381                        log_store.add_language_server(
 382                            LanguageServerKind::LocalSsh {
 383                                lsp_store: self.lsp_store.downgrade(),
 384                            },
 385                            *id,
 386                            Some(name.clone()),
 387                            *worktree_id,
 388                            lsp_store.read(cx).language_server_for_id(*id),
 389                            cx,
 390                        );
 391                    });
 392                }
 393            }
 394            LspStoreEvent::LanguageServerRemoved(id) => {
 395                let log_store = cx
 396                    .try_global::<GlobalLogStore>()
 397                    .map(|lsp_logs| lsp_logs.0.clone());
 398                if let Some(log_store) = log_store {
 399                    log_store.update(cx, |log_store, cx| {
 400                        log_store.remove_language_server(*id, cx);
 401                    });
 402                }
 403            }
 404            LspStoreEvent::LanguageServerUpdate {
 405                language_server_id,
 406                name,
 407                message,
 408            } => {
 409                self.session
 410                    .send(proto::UpdateLanguageServer {
 411                        project_id: REMOTE_SERVER_PROJECT_ID,
 412                        server_name: name.as_ref().map(|name| name.to_string()),
 413                        language_server_id: language_server_id.to_proto(),
 414                        variant: Some(message.clone()),
 415                    })
 416                    .log_err();
 417            }
 418            LspStoreEvent::Notification(message) => {
 419                self.session
 420                    .send(proto::Toast {
 421                        project_id: REMOTE_SERVER_PROJECT_ID,
 422                        notification_id: "lsp".to_string(),
 423                        message: message.clone(),
 424                    })
 425                    .log_err();
 426            }
 427            LspStoreEvent::LanguageServerPrompt(prompt) => {
 428                let request = self.session.request(proto::LanguageServerPromptRequest {
 429                    project_id: REMOTE_SERVER_PROJECT_ID,
 430                    actions: prompt
 431                        .actions
 432                        .iter()
 433                        .map(|action| action.title.to_string())
 434                        .collect(),
 435                    level: Some(prompt_to_proto(prompt)),
 436                    lsp_name: prompt.lsp_name.clone(),
 437                    message: prompt.message.clone(),
 438                });
 439                let prompt = prompt.clone();
 440                cx.background_spawn(async move {
 441                    let response = request.await?;
 442                    if let Some(action_response) = response.action_response {
 443                        prompt.respond(action_response as usize).await;
 444                    }
 445                    anyhow::Ok(())
 446                })
 447                .detach();
 448            }
 449            _ => {}
 450        }
 451    }
 452
 453    pub async fn handle_add_worktree(
 454        this: Entity<Self>,
 455        message: TypedEnvelope<proto::AddWorktree>,
 456        mut cx: AsyncApp,
 457    ) -> Result<proto::AddWorktreeResponse> {
 458        use client::ErrorCodeExt;
 459        let fs = this.read_with(&cx, |this, _| this.fs.clone());
 460        let path = PathBuf::from(shellexpand::tilde(&message.payload.path).to_string());
 461
 462        let canonicalized = match fs.canonicalize(&path).await {
 463            Ok(path) => path,
 464            Err(e) => {
 465                let mut parent = path
 466                    .parent()
 467                    .ok_or(e)
 468                    .with_context(|| format!("{path:?} does not exist"))?;
 469                if parent == Path::new("") {
 470                    parent = util::paths::home_dir();
 471                }
 472                let parent = fs.canonicalize(parent).await.map_err(|_| {
 473                    anyhow!(
 474                        proto::ErrorCode::DevServerProjectPathDoesNotExist
 475                            .with_tag("path", path.to_string_lossy().as_ref())
 476                    )
 477                })?;
 478                if let Some(file_name) = path.file_name() {
 479                    parent.join(file_name)
 480                } else {
 481                    parent
 482                }
 483            }
 484        };
 485
 486        let worktree = this
 487            .read_with(&cx.clone(), |this, _| {
 488                Worktree::local(
 489                    Arc::from(canonicalized.as_path()),
 490                    message.payload.visible,
 491                    this.fs.clone(),
 492                    this.next_entry_id.clone(),
 493                    true,
 494                    &mut cx,
 495                )
 496            })
 497            .await?;
 498
 499        let response = this.read_with(&cx, |_, cx| {
 500            let worktree = worktree.read(cx);
 501            proto::AddWorktreeResponse {
 502                worktree_id: worktree.id().to_proto(),
 503                canonicalized_path: canonicalized.to_string_lossy().into_owned(),
 504            }
 505        });
 506
 507        // We spawn this asynchronously, so that we can send the response back
 508        // *before* `worktree_store.add()` can send out UpdateProject requests
 509        // to the client about the new worktree.
 510        //
 511        // That lets the client manage the reference/handles of the newly-added
 512        // worktree, before getting interrupted by an UpdateProject request.
 513        //
 514        // This fixes the problem of the client sending the AddWorktree request,
 515        // headless project sending out a project update, client receiving it
 516        // and immediately dropping the reference of the new client, causing it
 517        // to be dropped on the headless project, and the client only then
 518        // receiving a response to AddWorktree.
 519        cx.spawn(async move |cx| {
 520            this.update(cx, |this, cx| {
 521                this.worktree_store.update(cx, |worktree_store, cx| {
 522                    worktree_store.add(&worktree, cx);
 523                });
 524            });
 525        })
 526        .detach();
 527
 528        Ok(response)
 529    }
 530
 531    pub async fn handle_remove_worktree(
 532        this: Entity<Self>,
 533        envelope: TypedEnvelope<proto::RemoveWorktree>,
 534        mut cx: AsyncApp,
 535    ) -> Result<proto::Ack> {
 536        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
 537        this.update(&mut cx, |this, cx| {
 538            this.worktree_store.update(cx, |worktree_store, cx| {
 539                worktree_store.remove_worktree(worktree_id, cx);
 540            });
 541        });
 542        Ok(proto::Ack {})
 543    }
 544
 545    pub async fn handle_open_buffer_by_path(
 546        this: Entity<Self>,
 547        message: TypedEnvelope<proto::OpenBufferByPath>,
 548        mut cx: AsyncApp,
 549    ) -> Result<proto::OpenBufferResponse> {
 550        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 551        let path = RelPath::from_proto(&message.payload.path)?;
 552        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 553            let buffer_store = this.buffer_store.clone();
 554            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 555                buffer_store.open_buffer(ProjectPath { worktree_id, path }, cx)
 556            });
 557            (buffer_store, buffer)
 558        });
 559
 560        let buffer = buffer.await?;
 561        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 562        buffer_store.update(&mut cx, |buffer_store, cx| {
 563            buffer_store
 564                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 565                .detach_and_log_err(cx);
 566        });
 567
 568        Ok(proto::OpenBufferResponse {
 569            buffer_id: buffer_id.to_proto(),
 570        })
 571    }
 572
 573    pub async fn handle_open_image_by_path(
 574        this: Entity<Self>,
 575        message: TypedEnvelope<proto::OpenImageByPath>,
 576        mut cx: AsyncApp,
 577    ) -> Result<proto::OpenImageResponse> {
 578        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
 579        let worktree_id = WorktreeId::from_proto(message.payload.worktree_id);
 580        let path = RelPath::from_proto(&message.payload.path)?;
 581        let project_id = message.payload.project_id;
 582        use proto::create_image_for_peer::Variant;
 583
 584        let (worktree_store, session) = this.read_with(&cx, |this, _| {
 585            (this.worktree_store.clone(), this.session.clone())
 586        });
 587
 588        let worktree = worktree_store
 589            .read_with(&cx, |store, cx| store.worktree_for_id(worktree_id, cx))
 590            .context("worktree not found")?;
 591
 592        let load_task = worktree.update(&mut cx, |worktree, cx| {
 593            worktree.load_binary_file(path.as_ref(), cx)
 594        });
 595
 596        let loaded_file = load_task.await?;
 597        let content = loaded_file.content;
 598        let file = loaded_file.file;
 599
 600        let proto_file = worktree.read_with(&cx, |_worktree, cx| file.to_proto(cx));
 601        let image_id =
 602            ImageId::from(NonZeroU64::new(NEXT_ID.fetch_add(1, Ordering::Relaxed)).unwrap());
 603
 604        let format = image::guess_format(&content)
 605            .map(|f| format!("{:?}", f).to_lowercase())
 606            .unwrap_or_else(|_| "unknown".to_string());
 607
 608        let state = proto::ImageState {
 609            id: image_id.to_proto(),
 610            file: Some(proto_file),
 611            content_size: content.len() as u64,
 612            format,
 613        };
 614
 615        session.send(proto::CreateImageForPeer {
 616            project_id,
 617            peer_id: Some(REMOTE_SERVER_PEER_ID),
 618            variant: Some(Variant::State(state)),
 619        })?;
 620
 621        const CHUNK_SIZE: usize = 1024 * 1024; // 1MB chunks
 622        for chunk in content.chunks(CHUNK_SIZE) {
 623            session.send(proto::CreateImageForPeer {
 624                project_id,
 625                peer_id: Some(REMOTE_SERVER_PEER_ID),
 626                variant: Some(Variant::Chunk(proto::ImageChunk {
 627                    image_id: image_id.to_proto(),
 628                    data: chunk.to_vec(),
 629                })),
 630            })?;
 631        }
 632
 633        Ok(proto::OpenImageResponse {
 634            image_id: image_id.to_proto(),
 635        })
 636    }
 637
 638    pub async fn handle_trust_worktrees(
 639        this: Entity<Self>,
 640        envelope: TypedEnvelope<proto::TrustWorktrees>,
 641        mut cx: AsyncApp,
 642    ) -> Result<proto::Ack> {
 643        let trusted_worktrees = cx
 644            .update(|cx| TrustedWorktrees::try_get_global(cx))
 645            .context("missing trusted worktrees")?;
 646        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.clone());
 647        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 648            trusted_worktrees.trust(
 649                &worktree_store,
 650                envelope
 651                    .payload
 652                    .trusted_paths
 653                    .into_iter()
 654                    .filter_map(PathTrust::from_proto)
 655                    .collect(),
 656                cx,
 657            );
 658        });
 659        Ok(proto::Ack {})
 660    }
 661
 662    pub async fn handle_restrict_worktrees(
 663        this: Entity<Self>,
 664        envelope: TypedEnvelope<proto::RestrictWorktrees>,
 665        mut cx: AsyncApp,
 666    ) -> Result<proto::Ack> {
 667        let trusted_worktrees = cx
 668            .update(|cx| TrustedWorktrees::try_get_global(cx))
 669            .context("missing trusted worktrees")?;
 670        let worktree_store = this.read_with(&cx, |project, _| project.worktree_store.downgrade());
 671        trusted_worktrees.update(&mut cx, |trusted_worktrees, cx| {
 672            let restricted_paths = envelope
 673                .payload
 674                .worktree_ids
 675                .into_iter()
 676                .map(WorktreeId::from_proto)
 677                .map(PathTrust::Worktree)
 678                .collect::<HashSet<_>>();
 679            trusted_worktrees.restrict(worktree_store, restricted_paths, cx);
 680        });
 681        Ok(proto::Ack {})
 682    }
 683
 684    pub async fn handle_open_new_buffer(
 685        this: Entity<Self>,
 686        _message: TypedEnvelope<proto::OpenNewBuffer>,
 687        mut cx: AsyncApp,
 688    ) -> Result<proto::OpenBufferResponse> {
 689        let (buffer_store, buffer) = this.update(&mut cx, |this, cx| {
 690            let buffer_store = this.buffer_store.clone();
 691            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 692                buffer_store.create_buffer(None, true, cx)
 693            });
 694            (buffer_store, buffer)
 695        });
 696
 697        let buffer = buffer.await?;
 698        let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id());
 699        buffer_store.update(&mut cx, |buffer_store, cx| {
 700            buffer_store
 701                .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 702                .detach_and_log_err(cx);
 703        });
 704
 705        Ok(proto::OpenBufferResponse {
 706            buffer_id: buffer_id.to_proto(),
 707        })
 708    }
 709
 710    async fn handle_toggle_lsp_logs(
 711        _: Entity<Self>,
 712        envelope: TypedEnvelope<proto::ToggleLspLogs>,
 713        cx: AsyncApp,
 714    ) -> Result<()> {
 715        let server_id = LanguageServerId::from_proto(envelope.payload.server_id);
 716        cx.update(|cx| {
 717            let log_store = cx
 718                .try_global::<GlobalLogStore>()
 719                .map(|global_log_store| global_log_store.0.clone())
 720                .context("lsp logs store is missing")?;
 721            let toggled_log_kind =
 722                match proto::toggle_lsp_logs::LogType::from_i32(envelope.payload.log_type)
 723                    .context("invalid log type")?
 724                {
 725                    proto::toggle_lsp_logs::LogType::Log => LogKind::Logs,
 726                    proto::toggle_lsp_logs::LogType::Trace => LogKind::Trace,
 727                    proto::toggle_lsp_logs::LogType::Rpc => LogKind::Rpc,
 728                };
 729            log_store.update(cx, |log_store, _| {
 730                log_store.toggle_lsp_logs(server_id, envelope.payload.enabled, toggled_log_kind);
 731            });
 732            anyhow::Ok(())
 733        })?;
 734
 735        Ok(())
 736    }
 737
 738    async fn handle_open_server_settings(
 739        this: Entity<Self>,
 740        _: TypedEnvelope<proto::OpenServerSettings>,
 741        mut cx: AsyncApp,
 742    ) -> Result<proto::OpenBufferResponse> {
 743        let settings_path = paths::settings_file();
 744        let (worktree, path) = this
 745            .update(&mut cx, |this, cx| {
 746                this.worktree_store.update(cx, |worktree_store, cx| {
 747                    worktree_store.find_or_create_worktree(settings_path, false, cx)
 748                })
 749            })
 750            .await?;
 751
 752        let (buffer, buffer_store) = this.update(&mut cx, |this, cx| {
 753            let buffer = this.buffer_store.update(cx, |buffer_store, cx| {
 754                buffer_store.open_buffer(
 755                    ProjectPath {
 756                        worktree_id: worktree.read(cx).id(),
 757                        path: path,
 758                    },
 759                    cx,
 760                )
 761            });
 762
 763            (buffer, this.buffer_store.clone())
 764        });
 765
 766        let buffer = buffer.await?;
 767
 768        let buffer_id = cx.update(|cx| {
 769            if buffer.read(cx).is_empty() {
 770                buffer.update(cx, |buffer, cx| {
 771                    buffer.edit([(0..0, initial_server_settings_content())], None, cx)
 772                });
 773            }
 774
 775            let buffer_id = buffer.read(cx).remote_id();
 776
 777            buffer_store.update(cx, |buffer_store, cx| {
 778                buffer_store
 779                    .create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 780                    .detach_and_log_err(cx);
 781            });
 782
 783            buffer_id
 784        });
 785
 786        Ok(proto::OpenBufferResponse {
 787            buffer_id: buffer_id.to_proto(),
 788        })
 789    }
 790
 791    async fn handle_find_search_candidates(
 792        this: Entity<Self>,
 793        envelope: TypedEnvelope<proto::FindSearchCandidates>,
 794        mut cx: AsyncApp,
 795    ) -> Result<proto::Ack> {
 796        use futures::stream::StreamExt as _;
 797
 798        let peer_id = envelope.original_sender_id.unwrap_or(envelope.sender_id);
 799        let message = envelope.payload;
 800        let query = SearchQuery::from_proto(
 801            message.query.context("missing query field")?,
 802            PathStyle::local(),
 803        )?;
 804
 805        let project_id = message.project_id;
 806        let buffer_store = this.read_with(&cx, |this, _| this.buffer_store.clone());
 807        let handle = message.handle;
 808        let _buffer_store = buffer_store.clone();
 809        let client = this.read_with(&cx, |this, _| this.session.clone());
 810        let task = cx.spawn(async move |cx| {
 811            let results = this.update(cx, |this, cx| {
 812                project::Search::local(
 813                    this.fs.clone(),
 814                    this.buffer_store.clone(),
 815                    this.worktree_store.clone(),
 816                    message.limit as _,
 817                    cx,
 818                )
 819                .into_handle(query, cx)
 820                .matching_buffers(cx)
 821            });
 822            let (batcher, batches) =
 823                project::project_search::AdaptiveBatcher::new(cx.background_executor());
 824            let mut new_matches = Box::pin(results.rx);
 825
 826            let sender_task = cx.background_executor().spawn({
 827                let client = client.clone();
 828                async move {
 829                    let mut batches = std::pin::pin!(batches);
 830                    while let Some(buffer_ids) = batches.next().await {
 831                        client
 832                            .request(proto::FindSearchCandidatesChunk {
 833                                handle,
 834                                peer_id: Some(peer_id),
 835                                project_id,
 836                                variant: Some(
 837                                    proto::find_search_candidates_chunk::Variant::Matches(
 838                                        proto::FindSearchCandidatesMatches { buffer_ids },
 839                                    ),
 840                                ),
 841                            })
 842                            .await?;
 843                    }
 844                    anyhow::Ok(())
 845                }
 846            });
 847
 848            while let Some(buffer) = new_matches.next().await {
 849                let _ = buffer_store
 850                    .update(cx, |this, cx| {
 851                        this.create_buffer_for_peer(&buffer, REMOTE_SERVER_PEER_ID, cx)
 852                    })
 853                    .await;
 854                let buffer_id = buffer.read_with(cx, |this, _| this.remote_id().to_proto());
 855                batcher.push(buffer_id).await;
 856            }
 857            batcher.flush().await;
 858
 859            sender_task.await?;
 860
 861            client
 862                .request(proto::FindSearchCandidatesChunk {
 863                    handle,
 864                    peer_id: Some(peer_id),
 865                    project_id,
 866                    variant: Some(proto::find_search_candidates_chunk::Variant::Done(
 867                        proto::FindSearchCandidatesDone {},
 868                    )),
 869                })
 870                .await?;
 871            anyhow::Ok(())
 872        });
 873        _buffer_store.update(&mut cx, |this, _| {
 874            this.register_ongoing_project_search((peer_id, handle), task);
 875        });
 876
 877        Ok(proto::Ack {})
 878    }
 879
 880    // Goes from client to host.
 881    async fn handle_find_search_candidates_cancel(
 882        this: Entity<Self>,
 883        envelope: TypedEnvelope<proto::FindSearchCandidatesCancelled>,
 884        mut cx: AsyncApp,
 885    ) -> Result<()> {
 886        let buffer_store = this.read_with(&mut cx, |this, _| this.buffer_store.clone());
 887        BufferStore::handle_find_search_candidates_cancel(buffer_store, envelope, cx).await
 888    }
 889
 890    async fn handle_list_remote_directory(
 891        this: Entity<Self>,
 892        envelope: TypedEnvelope<proto::ListRemoteDirectory>,
 893        cx: AsyncApp,
 894    ) -> Result<proto::ListRemoteDirectoryResponse> {
 895        use smol::stream::StreamExt;
 896        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
 897        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
 898        let check_info = envelope
 899            .payload
 900            .config
 901            .as_ref()
 902            .is_some_and(|config| config.is_dir);
 903
 904        let mut entries = Vec::new();
 905        let mut entry_info = Vec::new();
 906        let mut response = fs.read_dir(&expanded).await?;
 907        while let Some(path) = response.next().await {
 908            let path = path?;
 909            if let Some(file_name) = path.file_name() {
 910                entries.push(file_name.to_string_lossy().into_owned());
 911                if check_info {
 912                    let is_dir = fs.is_dir(&path).await;
 913                    entry_info.push(proto::EntryInfo { is_dir });
 914                }
 915            }
 916        }
 917        Ok(proto::ListRemoteDirectoryResponse {
 918            entries,
 919            entry_info,
 920        })
 921    }
 922
 923    async fn handle_get_path_metadata(
 924        this: Entity<Self>,
 925        envelope: TypedEnvelope<proto::GetPathMetadata>,
 926        cx: AsyncApp,
 927    ) -> Result<proto::GetPathMetadataResponse> {
 928        let fs = cx.read_entity(&this, |this, _| this.fs.clone());
 929        let expanded = PathBuf::from(shellexpand::tilde(&envelope.payload.path).to_string());
 930
 931        let metadata = fs.metadata(&expanded).await?;
 932        let is_dir = metadata.map(|metadata| metadata.is_dir).unwrap_or(false);
 933
 934        Ok(proto::GetPathMetadataResponse {
 935            exists: metadata.is_some(),
 936            is_dir,
 937            path: expanded.to_string_lossy().into_owned(),
 938        })
 939    }
 940
 941    async fn handle_shutdown_remote_server(
 942        _this: Entity<Self>,
 943        _envelope: TypedEnvelope<proto::ShutdownRemoteServer>,
 944        cx: AsyncApp,
 945    ) -> Result<proto::Ack> {
 946        cx.spawn(async move |cx| {
 947            cx.update(|cx| {
 948                // TODO: This is a hack, because in a headless project, shutdown isn't executed
 949                // when calling quit, but it should be.
 950                cx.shutdown();
 951                cx.quit();
 952            })
 953        })
 954        .detach();
 955
 956        Ok(proto::Ack {})
 957    }
 958
 959    pub async fn handle_ping(
 960        _this: Entity<Self>,
 961        _envelope: TypedEnvelope<proto::Ping>,
 962        _cx: AsyncApp,
 963    ) -> Result<proto::Ack> {
 964        log::debug!("Received ping from client");
 965        Ok(proto::Ack {})
 966    }
 967
 968    async fn handle_get_processes(
 969        _this: Entity<Self>,
 970        _envelope: TypedEnvelope<proto::GetProcesses>,
 971        _cx: AsyncApp,
 972    ) -> Result<proto::GetProcessesResponse> {
 973        let mut processes = Vec::new();
 974        let refresh_kind = RefreshKind::nothing().with_processes(
 975            ProcessRefreshKind::nothing()
 976                .without_tasks()
 977                .with_cmd(UpdateKind::Always),
 978        );
 979
 980        for process in System::new_with_specifics(refresh_kind)
 981            .processes()
 982            .values()
 983        {
 984            let name = process.name().to_string_lossy().into_owned();
 985            let command = process
 986                .cmd()
 987                .iter()
 988                .map(|s| s.to_string_lossy().into_owned())
 989                .collect::<Vec<_>>();
 990
 991            processes.push(proto::ProcessInfo {
 992                pid: process.pid().as_u32(),
 993                name,
 994                command,
 995            });
 996        }
 997
 998        processes.sort_by_key(|p| p.name.clone());
 999
1000        Ok(proto::GetProcessesResponse { processes })
1001    }
1002
1003    async fn handle_get_directory_environment(
1004        this: Entity<Self>,
1005        envelope: TypedEnvelope<proto::GetDirectoryEnvironment>,
1006        mut cx: AsyncApp,
1007    ) -> Result<proto::DirectoryEnvironment> {
1008        let shell = task::shell_from_proto(envelope.payload.shell.context("missing shell")?)?;
1009        let directory = PathBuf::from(envelope.payload.directory);
1010        let environment = this
1011            .update(&mut cx, |this, cx| {
1012                this.environment.update(cx, |environment, cx| {
1013                    environment.local_directory_environment(&shell, directory.into(), cx)
1014                })
1015            })
1016            .await
1017            .context("failed to get directory environment")?
1018            .into_iter()
1019            .collect();
1020        Ok(proto::DirectoryEnvironment { environment })
1021    }
1022}
1023
1024fn prompt_to_proto(
1025    prompt: &project::LanguageServerPromptRequest,
1026) -> proto::language_server_prompt_request::Level {
1027    match prompt.level {
1028        PromptLevel::Info => proto::language_server_prompt_request::Level::Info(
1029            proto::language_server_prompt_request::Info {},
1030        ),
1031        PromptLevel::Warning => proto::language_server_prompt_request::Level::Warning(
1032            proto::language_server_prompt_request::Warning {},
1033        ),
1034        PromptLevel::Critical => proto::language_server_prompt_request::Level::Critical(
1035            proto::language_server_prompt_request::Critical {},
1036        ),
1037    }
1038}